http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/71b5b1c9/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/CopyTool.java ---------------------------------------------------------------------- diff --git a/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/CopyTool.java b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/CopyTool.java new file mode 100644 index 0000000..067ff49 --- /dev/null +++ b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/CopyTool.java @@ -0,0 +1,1003 @@ +package mvm.rya.accumulo.mr.merge; + +/* + * #%L + * mvm.rya.accumulo.mr.merge + * %% + * Copyright (C) 2014 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.PrintStream; +import java.nio.file.Paths; +import java.text.ParseException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Date; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.client.mapreduce.AbstractInputFormat; +import org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat; +import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; +import org.apache.accumulo.core.client.mapreduce.AccumuloMultiTableInputFormat; +import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat; +import org.apache.accumulo.core.client.mapreduce.InputTableConfig; +import org.apache.accumulo.core.client.mapreduce.lib.partition.KeyRangePartitioner; +import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.file.rfile.bcfile.Compression.Algorithm; +import org.apache.accumulo.core.iterators.user.AgeOffFilter; +import org.apache.accumulo.core.iterators.user.TimestampFilter; +import org.apache.accumulo.core.security.TablePermission; +import org.apache.accumulo.core.util.TextUtil; +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.io.FilenameUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; +import org.apache.hadoop.util.ToolRunner; +import org.apache.log4j.BasicConfigurator; +import org.apache.log4j.Logger; +import org.apache.log4j.xml.DOMConfigurator; + +import com.google.common.base.Joiner; + +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.accumulo.mr.merge.common.InstanceType; +import mvm.rya.accumulo.mr.merge.gui.DateTimePickerDialog; +import mvm.rya.accumulo.mr.merge.mappers.AccumuloCopyToolMapper; +import mvm.rya.accumulo.mr.merge.mappers.AccumuloRyaRuleMapper; +import mvm.rya.accumulo.mr.merge.mappers.FileCopyToolMapper; +import mvm.rya.accumulo.mr.merge.mappers.RowRuleMapper; +import mvm.rya.accumulo.mr.merge.mappers.MergeToolMapper; +import mvm.rya.accumulo.mr.merge.reducers.MultipleFileReducer; +import mvm.rya.accumulo.mr.merge.util.AccumuloInstanceDriver; +import mvm.rya.accumulo.mr.merge.util.AccumuloQueryRuleset; +import mvm.rya.accumulo.mr.merge.util.AccumuloRyaUtils; +import mvm.rya.accumulo.mr.merge.util.GroupedRow; +import mvm.rya.accumulo.mr.merge.util.TimeUtils; +import mvm.rya.accumulo.mr.merge.util.ToolConfigUtils; +import mvm.rya.accumulo.mr.utils.AccumuloHDFSFileInputFormat; +import mvm.rya.accumulo.mr.utils.MRUtils; +import mvm.rya.api.RdfCloudTripleStoreConstants; +import mvm.rya.api.RdfCloudTripleStoreUtils; +import mvm.rya.api.layout.TablePrefixLayoutStrategy; +import mvm.rya.indexing.accumulo.ConfigUtils; + +/** + * Handles copying data from a parent instance into a child instance. + */ +public class CopyTool extends AbstractDualInstanceAccumuloMRTool { + private static final Logger log = Logger.getLogger(CopyTool.class); + + /** + * Use this property to set the tables that are going to be copied. The list should + * be a comma-separated string containing the full table names. If not set, then all + * tables will be copied. + */ + public static final String COPY_TABLE_LIST_PROP = "copy.table.list"; + + /** + * Indicates the type of child instance to create. {@code null} or empty to not create an + * instance indicating that it already was created and exists. + */ + public static final String CREATE_CHILD_INSTANCE_TYPE_PROP = "create.child.instance.type"; + + /** + * The time difference between the parent machine and the time server. + */ + public static final String PARENT_TIME_OFFSET_PROP = "time.offset"; + + /** + * The time difference between the child machine and the time server. + */ + public static final String CHILD_TIME_OFFSET_PROP = "time.offset.child"; + + /** + * The host name of the time server to use. + */ + public static final String NTP_SERVER_HOST_PROP = "ntp.server.host"; + + /** + * The URL of the Apache Tomcat server web page running on the parent machine. + */ + public static final String PARENT_TOMCAT_URL_PROP = "tomcat.url"; + + /** + * The URL of the Apache Tomcat server web page running on the child machine. + */ + public static final String CHILD_TOMCAT_URL_PROP = "tomcat.url.child"; + + /** + * The run time of the copy process. + */ + public static final String COPY_RUN_TIME_PROP = "copy.run.time"; + + /** + * "true" to use the NTP server to handle time synchronization. + * "false" (or any other value) to not use the NTP server. + */ + public static final String USE_NTP_SERVER_PROP = "use.ntp.server"; + + /** + * "true" to use file output. "false" to use Accumulo output. + */ + public static final String USE_COPY_FILE_OUTPUT = "use.copy.file.output"; + + /** + * The file path to output the child data to. + */ + public static final String COPY_FILE_OUTPUT_PATH = "copy.file.output.path"; + + /** + * The compression type to use for file output. One of "none", "gz", "lzo", or "snappy". + */ + public static final String COPY_FILE_OUTPUT_COMPRESSION_TYPE = "copy.file.output.compression.type"; + + /** + * "true" to clear the file output directory before copying. "false" to leave the output directory alone. + */ + public static final String USE_COPY_FILE_OUTPUT_DIRECTORY_CLEAR = "use.copy.file.output.directory.clear"; + + /** + * The input directory for importing files into accumulo tables. + */ + public static final String COPY_FILE_IMPORT_DIRECTORY = "copy.file.import.directory"; + + /** + * "true" to read from the input directory. "false" otherwise. + */ + public static final String USE_COPY_FILE_IMPORT = "use.copy.file.import"; + + /** + * "true" to extract a set of rules from a SPARQL query, and only copy statements relevant to those rules. "false" otherwise. + * If set, either the query itself or a query file should also be provided. + */ + public static final String USE_COPY_QUERY_SPARQL = "use.copy.query.sparql"; + + /** + * The text of the query that defines which statements to copy. + */ + public static final String QUERY_STRING_PROP = "ac.copy.query"; + + /** + * The path to a file containing the query that defines which statements to copy. + */ + public static final String QUERY_FILE_PROP = "ac.copy.queryfile"; + + private static final String DIALOG_TITLE = "Select a Start Time/Date"; + private static final String DIALOG_MESSAGE = + "<html>Choose the time of the data to copy.<br>Only parent data AFTER the selected time will be copied to the child.</html>"; + + private String startTime = null; + private boolean useCopyFileOutput = false; + private String baseOutputDir = null; + private String localBaseOutputDir = null; + private String compressionType = null; + private boolean useCopyFileOutputDirectoryClear = false; + private String tempDir = null; + private boolean useCopyFileImport = false; + private boolean useQuery = false; + private String localCopyFileImportDir = null; + private String baseImportDir = null; + + private final List<String> tables = new ArrayList<>(); + + private AccumuloInstanceDriver childAccumuloInstanceDriver = null; + + /** + * Sets up and initializes the copy tool's configuration. + * @throws Exception + */ + public void setup() throws Exception { + super.init(); + + tempDir = conf.get("hadoop.tmp.dir", null); + if (tempDir == null) { + throw new Exception("Invalid hadoop temp directory. \"hadoop.tmp.dir\" could not be found in the configuration."); + } + + useCopyFileOutput = conf.getBoolean(USE_COPY_FILE_OUTPUT, false); + baseOutputDir = tempDir + "/copy_tool_file_output/"; + localBaseOutputDir = conf.get(COPY_FILE_OUTPUT_PATH, null); + compressionType = conf.get(COPY_FILE_OUTPUT_COMPRESSION_TYPE, null); + useCopyFileOutputDirectoryClear = conf.getBoolean(USE_COPY_FILE_OUTPUT_DIRECTORY_CLEAR, false); + localCopyFileImportDir = conf.get(COPY_FILE_IMPORT_DIRECTORY, null); + baseImportDir = tempDir + "/copy_tool_import/"; + + startTime = conf.get(MergeTool.START_TIME_PROP, null); + + if (!useCopyFileImport) { + // Display start time dialog if requested + if (MergeTool.USE_START_TIME_DIALOG.equals(startTime)) { + log.info("Select start time from dialog..."); + DateTimePickerDialog dateTimePickerDialog = new DateTimePickerDialog(DIALOG_TITLE, DIALOG_MESSAGE); + dateTimePickerDialog.setVisible(true); + + Date date = dateTimePickerDialog.getSelectedDateTime(); + startTime = MergeTool.START_TIME_FORMATTER.format(date); + conf.set(MergeTool.START_TIME_PROP, startTime); + log.info("Will copy all data after " + date); + } else if (startTime != null) { + try { + Date date = MergeTool.START_TIME_FORMATTER.parse(startTime); + log.info("Will copy all data after " + date); + } catch (ParseException e) { + throw new Exception("Unable to parse the provided start time: " + startTime, e); + } + } + + Date copyRunTime = new Date(); + boolean useTimeSync = conf.getBoolean(USE_NTP_SERVER_PROP, false); + if (useTimeSync) { + String tomcatUrl = conf.get(PARENT_TOMCAT_URL_PROP, null); + String ntpServerHost = conf.get(NTP_SERVER_HOST_PROP, null); + Long timeOffset = null; + Date ntpDate = null; + try { + log.info("Comparing parent machine's time to NTP server time..."); + ntpDate = TimeUtils.getNtpServerDate(ntpServerHost); + Date parentMachineDate = TimeUtils.getMachineDate(tomcatUrl); + boolean isMachineLocal = TimeUtils.isUrlLocalMachine(tomcatUrl); + timeOffset = TimeUtils.getTimeDifference(ntpDate, parentMachineDate, isMachineLocal); + } catch (IOException | ParseException e) { + throw new Exception("Unable to get time difference between machine and NTP server.", e); + } + if (timeOffset != null) { + conf.set(PARENT_TIME_OFFSET_PROP, "" + timeOffset); + } + copyRunTime = ntpDate; + } + String copyRunTimeString = MergeTool.START_TIME_FORMATTER.format(copyRunTime); + if (copyRunTime != null) { + conf.set(COPY_RUN_TIME_PROP, copyRunTimeString); + } + } + + MergeTool.setDuplicateKeys(conf); + + String copyTableListProperty = conf.get(COPY_TABLE_LIST_PROP); + if (StringUtils.isNotBlank(copyTableListProperty)) { + // Copy the tables specified in the config + String[] split = copyTableListProperty.split(","); + tables.addAll(Arrays.asList(split)); + } else if (useCopyFileImport) { + File importDir = new File(localCopyFileImportDir); + String[] files = importDir.list(); + tables.addAll(Arrays.asList(files)); + } else { + // By default copy all tables + tables.add(tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX); + tables.add(tablePrefix + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX); + tables.add(tablePrefix + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX); + tables.add(tablePrefix + RdfCloudTripleStoreConstants.TBL_NS_SUFFIX); + tables.add(tablePrefix + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX); + tables.add(tablePrefix + RdfCloudTripleStoreConstants.TBL_STATS_SUFFIX); + tables.add(tablePrefix + RdfCloudTripleStoreConstants.TBL_SEL_SUFFIX); + tables.add(ConfigUtils.getFreeTextDocTablename(conf)); + tables.add(ConfigUtils.getFreeTextTermTablename(conf)); + tables.add(ConfigUtils.getGeoTablename(conf)); + tables.add(ConfigUtils.getTemporalTableName(conf)); + tables.add(ConfigUtils.getEntityTableName(conf)); + } + if (tables.isEmpty()) { + log.warn("No list of tables to copy was provided."); + } else { + String tablesToCopy = Joiner.on("\r\n\t").join(tables); + log.info("Will attempt to copy the following tables/indices from the parent:\r\n\t" + tablesToCopy); + } + } + + @Override + public int run(String[] strings) throws Exception { + useCopyFileImport = conf.getBoolean(USE_COPY_FILE_IMPORT, false); + useQuery = conf.getBoolean(USE_COPY_QUERY_SPARQL, false); + + if (useCopyFileImport) { + return runImport(); + } else if (useQuery) { + return runQueryCopy(); + } else { + return runCopy(); + } + } + + private int runCopy() throws Exception { + log.info("Setting up Copy Tool..."); + + setup(); + + if (!useCopyFileOutput) { + createChildInstance(conf); + } + + AccumuloRdfConfiguration parentAccumuloRdfConfiguration = new AccumuloRdfConfiguration(conf); + parentAccumuloRdfConfiguration.setTablePrefix(tablePrefix); + Connector parentConnector = AccumuloRyaUtils.setupConnector(parentAccumuloRdfConfiguration); + TableOperations parentTableOperations = parentConnector.tableOperations(); + + for (String table : tables) { + // Check if the parent table exists before creating a job on it + if (parentTableOperations.exists(table)) { + String childTable = table.replaceFirst(tablePrefix, childTablePrefix); + String jobName = "Copy Tool, copying Parent Table: " + table + ", into Child Table: " + childTable + ", " + System.currentTimeMillis(); + log.info("Initializing job: " + jobName); + conf.set(MRUtils.JOB_NAME_PROP, jobName); + conf.set(MergeTool.TABLE_NAME_PROP, table); + + Job job = Job.getInstance(conf); + job.setJarByClass(CopyTool.class); + + setupInputFormat(job); + + AccumuloInputFormat.setInputTableName(job, table); + + // Set input output of the particular job + if (useCopyFileOutput) { + job.setMapOutputKeyClass(Key.class); + job.setMapOutputValueClass(Value.class); + job.setOutputKeyClass(Key.class); + job.setOutputValueClass(Value.class); + } else { + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(Mutation.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Mutation.class); + } + + setupOutputFormat(job, childTable); + + // Set mapper and reducer classes + if (useCopyFileOutput) { + setupSplitsFile(job, parentTableOperations, table, childTable); + job.setMapperClass(FileCopyToolMapper.class); + } else { + job.setMapperClass(AccumuloCopyToolMapper.class); + } + job.setReducerClass(Reducer.class); + + // Submit the job + Date beginTime = new Date(); + log.info("Job for table \"" + table + "\" started: " + beginTime); + int exitCode = job.waitForCompletion(true) ? 0 : 1; + + if (exitCode == 0) { + if (useCopyFileOutput) { + log.info("Moving data from HDFS to the local file system for the table: " + childTable); + Path hdfsPath = getPath(baseOutputDir, childTable); + Path localPath = getPath(localBaseOutputDir, childTable); + log.info("HDFS directory: " + hdfsPath.toString()); + log.info("Local directory: " + localPath.toString()); + copyHdfsToLocal(hdfsPath, localPath); + } + + Date endTime = new Date(); + log.info("Job for table \"" + table + "\" finished: " + endTime); + log.info("The job took " + (endTime.getTime() - beginTime.getTime()) / 1000 + " seconds."); + } else { + log.error("Job for table \"" + table + "\" Failed!!!"); + return exitCode; + } + } else { + log.warn("The table \"" + table + "\" was NOT found in the parent instance and cannot be copied."); + } + } + + return 0; + } + + private int runImport() throws Exception { + log.info("Setting up Copy Tool for importing..."); + + setup(); + + createChildInstance(conf); + + for (String childTable : tables) { + String jobName = "Copy Tool, importing Exported Parent Table files from: " + getPath(localCopyFileImportDir, childTable).toString() + ", into Child Table: " + childTable + ", " + System.currentTimeMillis(); + log.info("Initializing job: " + jobName); + conf.set(MRUtils.JOB_NAME_PROP, jobName); + + // Submit the job + Date beginTime = new Date(); + log.info("Job for table \"" + childTable + "\" started: " + beginTime); + + createTableIfNeeded(childTable); + importFilesToChildTable(childTable); + + Date endTime = new Date(); + log.info("Job for table \"" + childTable + "\" finished: " + endTime); + log.info("The job took " + (endTime.getTime() - beginTime.getTime()) / 1000 + " seconds."); + } + + return 0; + } + + private int runQueryCopy() throws Exception { + log.info("Setting up Copy Tool with a query-based ruleset..."); + setup(); + if (!useCopyFileOutput) { + createChildInstance(conf); + } + + // Set up the configuration + AccumuloRdfConfiguration aconf = new AccumuloRdfConfiguration(conf); + aconf.setBoolean(ConfigUtils.USE_MOCK_INSTANCE, mock); + aconf.setTablePrefix(tablePrefix); + aconf.setFlush(false); + ConfigUtils.setIndexers(aconf); + + // Since we're copying at the statement-level, ignore any given list of tables and determine + // which tables we might need to create based on which indexers are desired. + TablePrefixLayoutStrategy prefixStrategy = new TablePrefixLayoutStrategy(tablePrefix); + tables.clear(); + // Always include core tables + tables.add(prefixStrategy.getSpo()); + tables.add(prefixStrategy.getOsp()); + tables.add(prefixStrategy.getPo()); + // Copy namespaces if they exist + tables.add(prefixStrategy.getNs()); + // Add tables associated with any configured indexers + if (aconf.getBoolean(ConfigUtils.USE_FREETEXT, false)) { + tables.add(ConfigUtils.getFreeTextDocTablename(conf)); + tables.add(ConfigUtils.getFreeTextTermTablename(conf)); + } + if (aconf.getBoolean(ConfigUtils.USE_GEO, false)) { + tables.add(ConfigUtils.getGeoTablename(conf)); + } + if (aconf.getBoolean(ConfigUtils.USE_TEMPORAL, false)) { + tables.add(ConfigUtils.getTemporalTableName(conf)); + } + if (aconf.getBoolean(ConfigUtils.USE_ENTITY, false)) { + tables.add(ConfigUtils.getEntityTableName(conf)); + } + // Ignore anything else, e.g. statistics -- must be recalculated for the child if desired + + // Extract the ruleset, and copy the namespace table directly + AccumuloQueryRuleset ruleset = new AccumuloQueryRuleset(aconf); + ruleset.addTable(prefixStrategy.getNs()); + for (String line : ruleset.toString().split("\n")) { + log.info(line); + } + + // Create a Job and configure its input and output + Job job = Job.getInstance(aconf); + job.setJarByClass(this.getClass()); + setupMultiTableInputFormat(job, ruleset); + setupOutputFormat(job, ""); + + if (useCopyFileOutput) { + // Configure job for file output + job.setJobName("Ruleset-based export to file: " + tablePrefix + " -> " + localBaseOutputDir); + // Map (row) to (table+key, key+value) + job.setMapperClass(RowRuleMapper.class); + job.setMapOutputKeyClass(GroupedRow.class); + job.setMapOutputValueClass(GroupedRow.class); + // Group according to table and and sort according to key + job.setGroupingComparatorClass(GroupedRow.GroupComparator.class); + job.setSortComparatorClass(GroupedRow.SortComparator.class); + // Reduce ([table+row], rows): output each row to the file for that table, in sorted order + job.setReducerClass(MultipleFileReducer.class); + job.setOutputKeyClass(Key.class); + job.setOutputValueClass(Value.class); + } + else { + // Configure job for table output + job.setJobName("Ruleset-based copy: " + tablePrefix + " -> " + childTablePrefix); + // Map (row): convert to statement, insert to child (for namespace table, output row directly) + job.setMapperClass(AccumuloRyaRuleMapper.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Mutation.class); + job.setNumReduceTasks(0); + // Create the child tables, so mappers don't try to do this in parallel + for (String parentTable : tables) { + String childTable = parentTable.replaceFirst(tablePrefix, childTablePrefix); + createTableIfNeeded(childTable); + } + } + + // Run the job and copy files to local filesystem if needed + Date beginTime = new Date(); + log.info("Job started: " + beginTime); + boolean success = job.waitForCompletion(true); + if (success) { + if (useCopyFileOutput) { + log.info("Moving data from HDFS to the local file system"); + Path baseOutputPath = new Path(baseOutputDir); + for (FileStatus status : FileSystem.get(conf).listStatus(baseOutputPath)) { + if (status.isDirectory()) { + String tableName = status.getPath().getName(); + Path hdfsPath = getPath(baseOutputDir, tableName); + Path localPath = getPath(localBaseOutputDir, tableName); + log.info("HDFS directory: " + hdfsPath.toString()); + log.info("Local directory: " + localPath.toString()); + copyHdfsToLocal(hdfsPath, localPath); + } + } + } + Date endTime = new Date(); + log.info("Job finished: " + endTime); + log.info("The job took " + (endTime.getTime() - beginTime.getTime()) / 1000 + " seconds."); + return 0; + } else { + log.error("Job failed!!!"); + return 1; + } + } + + /** + * Creates the child table if it doesn't already exist. + * @param childTableName the name of the child table. + * @throws IOException + */ + public void createTableIfNeeded(String childTableName) throws IOException { + try { + Configuration childConfig = MergeToolMapper.getChildConfig(conf); + AccumuloRdfConfiguration childAccumuloRdfConfiguration = new AccumuloRdfConfiguration(childConfig); + childAccumuloRdfConfiguration.setTablePrefix(childTablePrefix); + Connector childConnector = AccumuloRyaUtils.setupConnector(childAccumuloRdfConfiguration); + if (!childConnector.tableOperations().exists(childTableName)) { + log.info("Creating table: " + childTableName); + childConnector.tableOperations().create(childTableName); + log.info("Created table: " + childTableName); + log.info("Granting authorizations to table: " + childTableName); + childConnector.securityOperations().grantTablePermission(childUserName, childTableName, TablePermission.WRITE); + log.info("Granted authorizations to table: " + childTableName); + } + } catch (TableExistsException | AccumuloException | AccumuloSecurityException e) { + throw new IOException(e); + } + } + + private void setupSplitsFile(Job job, TableOperations parentTableOperations, String parentTableName, String childTableName) throws Exception { + FileSystem fs = FileSystem.get(conf); + fs.setPermission(getPath(baseOutputDir, childTableName), new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)); + Path splitsPath = getPath(baseOutputDir, childTableName, "splits.txt"); + Collection<Text> splits = parentTableOperations.listSplits(parentTableName, 100); + log.info("Creating splits file at: " + splitsPath); + try (PrintStream out = new PrintStream(new BufferedOutputStream(fs.create(splitsPath)))) { + for (Text split : splits) { + String encoded = new String(Base64.encodeBase64(TextUtil.getBytes(split))); + out.println(encoded); + } + } + fs.setPermission(splitsPath, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)); + + String userDir = System.getProperty("user.dir"); + // The splits file has a symlink created in the user directory for some reason. + // It might be better to copy the entire file for Windows but it doesn't seem to matter if + // the user directory symlink is broken. + java.nio.file.Files.deleteIfExists(new File(userDir, "splits.txt").toPath()); + //Files.copy(new File(splitsPath.toString()), new File(userDir, "splits.txt")); + job.setPartitionerClass(KeyRangePartitioner.class); + KeyRangePartitioner.setSplitFile(job, splitsPath.toString()); + job.setNumReduceTasks(splits.size() + 1); + } + + /** + * Converts a path string, or a sequence of strings that when joined form a path string, + * to a {@link org.apache.hadoop.fs.Path}. + * @param first The path string or initial part of the path string. + * @param more Additional strings to be joined to form the path string. + * @return the resulting {@link org.apache.hadoop.fs.Path}. + */ + public static Path getPath(String first, String... more) { + java.nio.file.Path path = Paths.get(first, more); + String stringPath = FilenameUtils.separatorsToUnix(path.toAbsolutePath().toString()); + Path hadoopPath = new Path(stringPath); + return hadoopPath; + } + + /** + * Imports the files that hold the table data into the child instance. + * @param childTableName the name of the child table to import. + * @throws Exception + */ + public void importFilesToChildTable(String childTableName) throws Exception { + Configuration childConfig = MergeToolMapper.getChildConfig(conf); + AccumuloRdfConfiguration childAccumuloRdfConfiguration = new AccumuloRdfConfiguration(childConfig); + childAccumuloRdfConfiguration.setTablePrefix(childTablePrefix); + Connector childConnector = AccumuloRyaUtils.setupConnector(childAccumuloRdfConfiguration); + TableOperations childTableOperations = childConnector.tableOperations(); + + Path localWorkDir = getPath(localCopyFileImportDir, childTableName); + Path hdfsBaseWorkDir = getPath(baseImportDir, childTableName); + + copyLocalToHdfs(localWorkDir, hdfsBaseWorkDir); + + Path files = getPath(hdfsBaseWorkDir.toString(), "files"); + Path failures = getPath(hdfsBaseWorkDir.toString(), "failures"); + FileSystem fs = FileSystem.get(conf); + // With HDFS permissions on, we need to make sure the Accumulo user can read/move the files + fs.setPermission(hdfsBaseWorkDir, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)); + if (fs.exists(failures)) { + fs.delete(failures, true); + } + fs.mkdirs(failures); + + childTableOperations.importDirectory(childTableName, files.toString(), failures.toString(), false); + } + + /** + * Copies the file from the local file system into the HDFS. + * @param localInputPath the local system input {@link Path}. + * @param hdfsOutputPath the HDFS output {@link Path}. + * @throws IOException + */ + public void copyLocalToHdfs(Path localInputPath, Path hdfsOutputPath) throws IOException { + copyLocalToHdfs(localInputPath, hdfsOutputPath, conf); + } + + /** + * Copies the file from the local file system into the HDFS. + * @param localInputPath the local system input {@link Path}. + * @param hdfsOutputPath the HDFS output {@link Path}. + * @param configuration the {@link Configuration} to use. + * @throws IOException + */ + public static void copyLocalToHdfs(Path localInputPath, Path hdfsOutputPath, Configuration configuration) throws IOException { + FileSystem fs = FileSystem.get(configuration); + fs.copyFromLocalFile(localInputPath, hdfsOutputPath); + } + + /** + * Copies the file from HDFS into the local file system. + * @param hdfsInputPath the HDFS input {@link Path}. + * @param localOutputPath the local system output {@link Path}. + * @throws IOException + */ + public void copyHdfsToLocal(Path hdfsInputPath, Path localOutputPath) throws IOException { + copyHdfsToLocal(hdfsInputPath, localOutputPath, conf); + } + + /** + * Copies the file from HDFS into the local file system. + * @param hdfsInputPath the HDFS input {@link Path}. + * @param localOutputPath the local system output {@link Path}. + * @param configuration the {@link Configuration} to use. + * @throws IOException + */ + public static void copyHdfsToLocal(Path hdfsInputPath, Path localOutputPath, Configuration configuration) throws IOException { + FileSystem fs = FileSystem.get(configuration); + fs.copyToLocalFile(hdfsInputPath, localOutputPath); + } + + @Override + protected void setupInputFormat(Job job) throws AccumuloSecurityException { + if (useCopyFileImport) { + try { + AccumuloHDFSFileInputFormat.setInputPaths(job, localCopyFileImportDir); + } catch (IOException e) { + log.error("Failed to set copy file import directory", e); + } + } else { + // set up accumulo input + if (!hdfsInput) { + job.setInputFormatClass(AccumuloInputFormat.class); + } else { + job.setInputFormatClass(AccumuloHDFSFileInputFormat.class); + } + AccumuloInputFormat.setConnectorInfo(job, userName, new PasswordToken(pwd)); + AccumuloInputFormat.setInputTableName(job, RdfCloudTripleStoreUtils.layoutPrefixToTable(rdfTableLayout, tablePrefix)); + AccumuloInputFormat.setScanAuthorizations(job, authorizations); + if (!mock) { + AccumuloInputFormat.setZooKeeperInstance(job, new ClientConfiguration().withInstance(instance).withZkHosts(zk)); + } else { + AccumuloInputFormat.setMockInstance(job, instance); + } + if (ttl != null) { + IteratorSetting setting = new IteratorSetting(1, "fi", AgeOffFilter.class); + AgeOffFilter.setTTL(setting, Long.valueOf(ttl)); + AccumuloInputFormat.addIterator(job, setting); + } + if (startTime != null) { + IteratorSetting setting = getStartTimeSetting(startTime); + AccumuloInputFormat.addIterator(job, setting); + } + for (IteratorSetting iteratorSetting : AccumuloRyaUtils.COMMON_REG_EX_FILTER_SETTINGS) { + AccumuloInputFormat.addIterator(job, iteratorSetting); + } + } + } + + /** + * Set up job to use AccumuloMultiTableInput format, using the tables/ranges given by a ruleset. + * @param job The Job to configure + * @param rules The ruleset mapping a query to the appropriate tables and ranges + */ + protected void setupMultiTableInputFormat(Job job, AccumuloQueryRuleset rules) throws AccumuloSecurityException { + AbstractInputFormat.setConnectorInfo(job, userName, new PasswordToken(pwd)); + AbstractInputFormat.setScanAuthorizations(job, authorizations); + if (!mock) { + AbstractInputFormat.setZooKeeperInstance(job, new ClientConfiguration().withInstance(instance).withZkHosts(zk)); + } else { + AbstractInputFormat.setMockInstance(job, instance); + } + Map<String, InputTableConfig> configs = rules.getInputConfigs(); + // Add any relevant iterator settings + List<IteratorSetting> additionalSettings = new LinkedList<>(AccumuloRyaUtils.COMMON_REG_EX_FILTER_SETTINGS); + if (ttl != null) { + IteratorSetting ttlSetting = new IteratorSetting(1, "fi", AgeOffFilter.class); + AgeOffFilter.setTTL(ttlSetting, Long.valueOf(ttl)); + additionalSettings.add(ttlSetting); + } + if (startTime != null) { + IteratorSetting startTimeSetting = getStartTimeSetting(startTime); + additionalSettings.add(startTimeSetting); + } + for (Map.Entry<String, InputTableConfig> entry : configs.entrySet()) { + List<IteratorSetting> iterators = entry.getValue().getIterators(); + iterators.addAll(additionalSettings); + entry.getValue().setIterators(iterators); + } + // Set the input format + AccumuloMultiTableInputFormat.setInputTableConfigs(job, configs); + job.setInputFormatClass(AccumuloMultiTableInputFormat.class); + } + + @Override + protected void setupOutputFormat(Job job, String outputTable) throws AccumuloSecurityException { + AccumuloOutputFormat.setConnectorInfo(job, childUserName, new PasswordToken(childPwd)); + AccumuloOutputFormat.setCreateTables(job, true); + AccumuloOutputFormat.setDefaultTableName(job, outputTable); + if (!childMock) { + AccumuloOutputFormat.setZooKeeperInstance(job, new ClientConfiguration().withInstance(childInstance).withZkHosts(childZk)); + } else { + AccumuloOutputFormat.setMockInstance(job, childInstance); + } + if (useCopyFileOutput) { + log.info("Using file output format mode."); + if (StringUtils.isNotBlank(baseOutputDir)) { + Path baseOutputPath; + Path filesOutputPath; + if (StringUtils.isNotBlank(outputTable)) { + filesOutputPath = getPath(baseOutputDir, outputTable, "files"); + baseOutputPath = filesOutputPath.getParent(); + job.setOutputFormatClass(AccumuloFileOutputFormat.class); + } + else { + // If table name is not given, configure output for one level higher: + // it's up to the job to handle subdirectories. Make sure the parent + // exists. + filesOutputPath = getPath(baseOutputDir); + baseOutputPath = filesOutputPath; + LazyOutputFormat.setOutputFormatClass(job, AccumuloFileOutputFormat.class); + MultipleOutputs.setCountersEnabled(job, true); + } + log.info("File output destination: " + filesOutputPath); + if (useCopyFileOutputDirectoryClear) { + try { + clearOutputDir(baseOutputPath); + } catch (IOException e) { + log.error("Error clearing out output path.", e); + } + } + try { + FileSystem fs = FileSystem.get(conf); + fs.mkdirs(filesOutputPath.getParent()); + fs.setPermission(filesOutputPath.getParent(), new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)); + } catch (IOException e) { + log.error("Failed to set permission for output path.", e); + } + AccumuloFileOutputFormat.setOutputPath(job, filesOutputPath); + + if (StringUtils.isNotBlank(compressionType)) { + if (isValidCompressionType(compressionType)) { + log.info("File compression type: " + compressionType); + AccumuloFileOutputFormat.setCompressionType(job, compressionType); + } else { + log.warn("Invalid compression type: " + compressionType); + } + } + } + } else { + log.info("Using accumulo output format mode."); + job.setOutputFormatClass(AccumuloOutputFormat.class); + } + } + + /** + * Sets up and runs the copy tool with the provided args. + * @param args the arguments list. + * @return the execution result. + */ + public int setupAndRun(String[] args) { + int returnCode = -1; + try { + Configuration conf = new Configuration(); + Set<String> toolArgs = ToolConfigUtils.getUserArguments(conf, args); + if (!toolArgs.isEmpty()) { + String parameters = Joiner.on("\r\n\t").join(toolArgs); + log.info("Running Copy Tool with the following parameters...\r\n\t" + parameters); + } + + returnCode = ToolRunner.run(conf, this, args); + } catch (Exception e) { + log.error("Error running copy tool", e); + } + return returnCode; + } + + public static void main(String[] args) { + String log4jConfiguration = System.getProperties().getProperty("log4j.configuration"); + if (StringUtils.isNotBlank(log4jConfiguration)) { + String parsedConfiguration = StringUtils.removeStart(log4jConfiguration, "file:"); + File configFile = new File(parsedConfiguration); + if (configFile.exists()) { + DOMConfigurator.configure(parsedConfiguration); + } else { + BasicConfigurator.configure(); + } + } + log.info("Starting Copy Tool"); + + Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread thread, Throwable throwable) { + log.error("Uncaught exception in " + thread.getName(), throwable); + } + }); + + CopyTool copyTool = new CopyTool(); + int returnCode = copyTool.setupAndRun(args); + + log.info("Finished running Copy Tool"); + + System.exit(returnCode); + } + + /** + * Creates an {@link IteratorSetting} with a time stamp filter that starts with the specified data. + * @param startTimeString the start time of the filter. + * @return the {@link IteratorSetting}. + */ + public static IteratorSetting getStartTimeSetting(String startTimeString) { + Date date = null; + try { + date = MergeTool.START_TIME_FORMATTER.parse(startTimeString); + } catch (ParseException e) { + throw new IllegalArgumentException("Couldn't parse " + startTimeString, e); + } + return getStartTimeSetting(date); + } + + /** + * Creates an {@link IteratorSetting} with a time stamp filter that starts with the specified data. + * @param date the start {@link Date} of the filter. + * @return the {@link IteratorSetting}. + */ + public static IteratorSetting getStartTimeSetting(Date date) { + return getStartTimeSetting(date.getTime()); + } + + /** + * Creates an {@link IteratorSetting} with a time stamp filter that starts with the specified data. + * @param time the start time of the filter. + * @return the {@link IteratorSetting}. + */ + public static IteratorSetting getStartTimeSetting(long time) { + IteratorSetting setting = new IteratorSetting(1, "startTimeIterator", TimestampFilter.class); + TimestampFilter.setStart(setting, time, true); + TimestampFilter.setEnd(setting, Long.MAX_VALUE, true); + return setting; + } + + /** + * Checks to see if the specified compression type is valid. The compression must be defined in + * {@link Algorithm} to be valid. + * @param compressionType the compression type to check. + * @return {@code true} if the compression type is one of "none", "gz", "lzo", or "snappy". + * {@code false} otherwise. + */ + private static boolean isValidCompressionType(String compressionType) { + for (Algorithm algorithm : Algorithm.values()) { + if (algorithm.getName().equals(compressionType)) { + return true; + } + } + return false; + } + + private void clearOutputDir(Path path) throws IOException { + FileSystem fs = FileSystem.get(conf); + fs.delete(path, true); + } + + private Instance createChildInstance(Configuration config) throws Exception { + Instance instance = null; + String instanceTypeProp = config.get(CREATE_CHILD_INSTANCE_TYPE_PROP); + String childAuth = config.get(MRUtils.AC_AUTH_PROP + MergeTool.CHILD_SUFFIX); + + // Default to distribution cluster if not specified + if (StringUtils.isBlank(instanceTypeProp)) { + instanceTypeProp = InstanceType.DISTRIBUTION.toString(); + } + + InstanceType instanceType = InstanceType.fromName(instanceTypeProp); + switch (instanceType) { + case DISTRIBUTION: + if (childInstance == null) { + throw new IllegalArgumentException("Must specify instance name for distributed mode"); + } else if (childZk == null) { + throw new IllegalArgumentException("Must specify ZooKeeper hosts for distributed mode"); + } + instance = new ZooKeeperInstance(childInstance, childZk); + break; + + case MINI: + childAccumuloInstanceDriver = new AccumuloInstanceDriver("Child", false, true, false, false, childUserName, childPwd, childInstance, childTablePrefix, childAuth); + childAccumuloInstanceDriver.setUpInstance(); + childAccumuloInstanceDriver.setUpTables(); + childZk = childAccumuloInstanceDriver.getZooKeepers(); + MergeTool.setDuplicateKeysForProperty(config, MRUtils.AC_ZK_PROP+ MergeTool.CHILD_SUFFIX, childZk); + instance = new ZooKeeperInstance(childInstance, childZk); + break; + + case MOCK: + instance = new MockInstance(childInstance); + break; + + default: + throw new AccumuloException("Unexpected instance type: " + instanceType); + } + + return instance; + } + + /** + * @return the child {@link AccumuloInstanceDriver} or {@code null}. + */ + public AccumuloInstanceDriver getChildAccumuloInstanceDriver() { + return childAccumuloInstanceDriver; + } + + /** + * Shuts down the child {@link AccumuloInstanceDriver} in the {@link CopyTool} if it exists. + * @throws Exception + */ + public void shutdown() throws Exception { + if (childAccumuloInstanceDriver != null) { + childAccumuloInstanceDriver.tearDown(); + } + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/71b5b1c9/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/MergeTool.java ---------------------------------------------------------------------- diff --git a/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/MergeTool.java b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/MergeTool.java new file mode 100644 index 0000000..89fb787 --- /dev/null +++ b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/MergeTool.java @@ -0,0 +1,554 @@ +package mvm.rya.accumulo.mr.merge; + +/* + * #%L + * mvm.rya.accumulo.mr.merge + * %% + * Copyright (C) 2014 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.File; +import java.io.IOException; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.admin.SecurityOperations; +import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.iterators.user.AgeOffFilter; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.TablePermission; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.util.ToolRunner; +import org.apache.log4j.BasicConfigurator; +import org.apache.log4j.Logger; +import org.apache.log4j.xml.DOMConfigurator; + +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.accumulo.mr.merge.gui.DateTimePickerDialog; +import mvm.rya.accumulo.mr.merge.mappers.MergeToolMapper; +import mvm.rya.accumulo.mr.merge.util.AccumuloRyaUtils; +import mvm.rya.accumulo.mr.merge.util.TimeUtils; +import mvm.rya.accumulo.mr.merge.util.ToolConfigUtils; +import mvm.rya.accumulo.mr.utils.AccumuloHDFSFileInputFormat; +import mvm.rya.accumulo.mr.utils.MRUtils; +import mvm.rya.api.RdfCloudTripleStoreConfiguration; +import mvm.rya.api.RdfCloudTripleStoreConstants; +import mvm.rya.api.RdfCloudTripleStoreUtils; +import mvm.rya.indexing.accumulo.ConfigUtils; + +/** + * Handles merging a child accumulo instance's data back into its parent's + * instance. + */ +public class MergeTool extends AbstractDualInstanceAccumuloMRTool { + private static final Logger log = Logger.getLogger(MergeTool.class); + + public static final SimpleDateFormat START_TIME_FORMATTER = new SimpleDateFormat("yyyyMMddHHmmssSSSz"); + + /** + * Appended to certain config property names to indicate that the property is for the child instance. + */ + public static final String CHILD_SUFFIX = ".child"; + + /** + * Suffix added to a child table when it is temporarily being imported into the parent instance when + * being read from file and before the tables are merged together. + */ + public static final String TEMP_SUFFIX = "_temp_child"; + + /** + * The time of the data to be included in the copy/merge process. + */ + public static final String START_TIME_PROP = "tool.start.time"; + + /** + * The name of the table to process for the map reduce job. + */ + public static final String TABLE_NAME_PROP = "tool.table.name"; + + /** + * "true" to use file input. "false" to use Accumulo output. + */ + public static final String USE_MERGE_FILE_INPUT = "use.merge.file.input"; + + /** + * The file path to the child data input to merge in. + */ + public static final String MERGE_FILE_INPUT_PATH = "merge.file.input.path"; + + /** + * A value used for the {@link #START_TIME_PROP} property to indicate that a dialog + * should be displayed to select the time. + */ + public static final String USE_START_TIME_DIALOG = "dialog"; + + private static final String DIALOG_TITLE = "Select a Start Time/Date"; + private static final String DIALOG_MESSAGE = + "<html>Choose the time of the data to merge.<br>Only data modified AFTER the selected time will be merged.</html>"; + + private String startTime = null; + private String tempDir = null; + private boolean useMergeFileInput = false; + private String localMergeFileImportDir = null; + private String baseImportDir = null; + + private String tempChildAuths = null; + + private final List<String> tables = new ArrayList<>(); + + /** + * Map of keys that are supposed to use the same values. + */ + public static final ImmutableMap<String, List<String>> DUPLICATE_KEY_MAP = ImmutableMap.<String, List<String>>builder() + .put(MRUtils.AC_MOCK_PROP, ImmutableList.of(ConfigUtils.USE_MOCK_INSTANCE)) + .put(MRUtils.AC_INSTANCE_PROP, ImmutableList.of(ConfigUtils.CLOUDBASE_INSTANCE)) + .put(MRUtils.AC_USERNAME_PROP, ImmutableList.of(ConfigUtils.CLOUDBASE_USER)) + .put(MRUtils.AC_PWD_PROP, ImmutableList.of(ConfigUtils.CLOUDBASE_PASSWORD)) + .put(MRUtils.AC_AUTH_PROP, ImmutableList.of(ConfigUtils.CLOUDBASE_AUTHS, RdfCloudTripleStoreConfiguration.CONF_QUERY_AUTH)) + .put(MRUtils.AC_ZK_PROP, ImmutableList.of(ConfigUtils.CLOUDBASE_ZOOKEEPERS)) + .put(MRUtils.TABLE_PREFIX_PROPERTY, ImmutableList.of(ConfigUtils.CLOUDBASE_TBL_PREFIX, RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX)) + .put(MRUtils.AC_MOCK_PROP + CHILD_SUFFIX, ImmutableList.of(ConfigUtils.USE_MOCK_INSTANCE + CHILD_SUFFIX)) + .put(MRUtils.AC_INSTANCE_PROP + CHILD_SUFFIX, ImmutableList.of(ConfigUtils.CLOUDBASE_INSTANCE + CHILD_SUFFIX)) + .put(MRUtils.AC_USERNAME_PROP + CHILD_SUFFIX, ImmutableList.of(ConfigUtils.CLOUDBASE_USER + CHILD_SUFFIX)) + .put(MRUtils.AC_PWD_PROP + CHILD_SUFFIX, ImmutableList.of(ConfigUtils.CLOUDBASE_PASSWORD + CHILD_SUFFIX)) + .put(MRUtils.AC_AUTH_PROP + CHILD_SUFFIX, ImmutableList.of(ConfigUtils.CLOUDBASE_AUTHS + CHILD_SUFFIX, RdfCloudTripleStoreConfiguration.CONF_QUERY_AUTH + CHILD_SUFFIX)) + .put(MRUtils.AC_ZK_PROP + CHILD_SUFFIX, ImmutableList.of(ConfigUtils.CLOUDBASE_ZOOKEEPERS + CHILD_SUFFIX)) + .put(MRUtils.TABLE_PREFIX_PROPERTY + CHILD_SUFFIX, ImmutableList.of(ConfigUtils.CLOUDBASE_TBL_PREFIX + CHILD_SUFFIX, RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX + CHILD_SUFFIX)) + .build(); + + + /** + * Sets duplicate keys in the config. + * @param config the {@link Configuration}. + */ + public static void setDuplicateKeys(Configuration config) { + for (Entry<String, List<String>> entry : DUPLICATE_KEY_MAP.entrySet()) { + String key = entry.getKey(); + List<String> duplicateKeys = entry.getValue(); + String value = config.get(key); + if (value != null) { + for (String duplicateKey : duplicateKeys) { + config.set(duplicateKey, value); + } + } + } + } + + /** + * Sets all duplicate keys for the property in the config to the specified value. + * @param config the {@link Configuration}. + * @param property the property to set and all its duplicates. + * @param value the value to set the property to. + */ + public static void setDuplicateKeysForProperty(Configuration config, String property, String value) { + List<String> duplicateKeys = DUPLICATE_KEY_MAP.get(property); + config.set(property, value); + if (duplicateKeys != null) { + for (String key : duplicateKeys) { + config.set(key, value); + } + } + } + + /** + * Sets up and initializes the merge tool's configuration. + * @throws Exception + */ + public void setup() throws Exception { + super.init(); + + tempDir = conf.get("hadoop.tmp.dir", null); + if (tempDir == null) { + throw new Exception("Invalid hadoop temp directory. \"hadoop.tmp.dir\" could not be found in the configuration."); + } + + useMergeFileInput = conf.getBoolean(USE_MERGE_FILE_INPUT, false); + localMergeFileImportDir = conf.get(MERGE_FILE_INPUT_PATH, null); + baseImportDir = tempDir + "/merge_tool_file_input/"; + + startTime = conf.get(START_TIME_PROP, null); + + if (!useMergeFileInput) { + // Display start time dialog if requested + if (USE_START_TIME_DIALOG.equals(startTime)) { + log.info("Select start time from dialog..."); + + DateTimePickerDialog dateTimePickerDialog = new DateTimePickerDialog(DIALOG_TITLE, DIALOG_MESSAGE); + dateTimePickerDialog.setVisible(true); + + Date date = dateTimePickerDialog.getSelectedDateTime(); + startTime = START_TIME_FORMATTER.format(date); + conf.set(START_TIME_PROP, startTime); + log.info("Will merge all data after " + date); + } else if (startTime != null) { + try { + Date date = START_TIME_FORMATTER.parse(startTime); + log.info("Will merge all data after " + date); + } catch (ParseException e) { + throw new Exception("Unable to parse the provided start time: " + startTime, e); + } + } + + boolean useTimeSync = conf.getBoolean(CopyTool.USE_NTP_SERVER_PROP, false); + if (useTimeSync) { + String tomcatUrl = conf.get(CopyTool.CHILD_TOMCAT_URL_PROP, null); + String ntpServerHost = conf.get(CopyTool.NTP_SERVER_HOST_PROP, null); + Long timeOffset = null; + try { + log.info("Comparing child machine's time to NTP server time..."); + timeOffset = TimeUtils.getNtpServerAndMachineTimeDifference(ntpServerHost, tomcatUrl); + } catch (IOException | ParseException e) { + throw new Exception("Unable to get time difference between machine and NTP server.", e); + } + if (timeOffset != null) { + conf.set(CopyTool.CHILD_TIME_OFFSET_PROP, "" + timeOffset); + } + } + } + + setDuplicateKeys(conf); + + tables.add(tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX); + } + + @Override + public int run(String[] strings) throws Exception { + useMergeFileInput = conf.getBoolean(USE_MERGE_FILE_INPUT, false); + + log.info("Setting up Merge Tool..."); + setup(); + + if (useMergeFileInput) { + // When using file input mode the child instance will use a temporary table in the parent instance to + // store the child table data. The two tables will then be merged together. + copyParentPropertiesToChild(conf); + } + + for (String table : tables) { + String childTable = table.replaceFirst(tablePrefix, childTablePrefix); + String jobName = "Merge Tool, merging Child Table: " + childTable + ", into Parent Table: " + table + ", " + System.currentTimeMillis(); + log.info("Initializing job: " + jobName); + conf.set(MRUtils.JOB_NAME_PROP, jobName); + conf.set(TABLE_NAME_PROP, table); + + Job job = Job.getInstance(conf); + job.setJarByClass(MergeTool.class); + + if (useMergeFileInput) { + importChildFilesToTempParentTable(childTable); + } + + setupInputFormat(job); + + AccumuloInputFormat.setInputTableName(job, table); + + // Set input output of the particular job + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(Mutation.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Mutation.class); + + setupOutputFormat(job, table); + + // Set mapper and reducer classes + job.setMapperClass(MergeToolMapper.class); + job.setReducerClass(Reducer.class); + + // Submit the job + Date beginTime = new Date(); + log.info("Job for table \"" + table + "\" started: " + beginTime); + int exitCode = job.waitForCompletion(true) ? 0 : 1; + + if (useMergeFileInput && StringUtils.isNotBlank(tempChildAuths)) { + // Clear any of the temporary child auths given to the parent + AccumuloRdfConfiguration parentAccumuloRdfConfiguration = new AccumuloRdfConfiguration(conf); + parentAccumuloRdfConfiguration.setTablePrefix(tablePrefix); + Connector parentConnector = AccumuloRyaUtils.setupConnector(parentAccumuloRdfConfiguration); + SecurityOperations secOps = parentConnector.securityOperations(); + + AccumuloRyaUtils.removeUserAuths(userName, secOps, tempChildAuths); + } + + if (exitCode == 0) { + Date endTime = new Date(); + log.info("Job for table \"" + table + "\" finished: " + endTime); + log.info("The job took " + (endTime.getTime() - beginTime.getTime()) / 1000 + " seconds."); + } else { + log.error("Job for table \"" + table + "\" Failed!!!"); + return exitCode; + } + } + + return 0; + } + + /** + * Creates the temp child table if it doesn't already exist in the parent. + * @param childTableName the name of the child table. + * @throws IOException + */ + public void createTempTableIfNeeded(String childTableName) throws IOException { + try { + AccumuloRdfConfiguration accumuloRdfConfiguration = new AccumuloRdfConfiguration(conf); + accumuloRdfConfiguration.setTablePrefix(childTablePrefix); + Connector connector = AccumuloRyaUtils.setupConnector(accumuloRdfConfiguration); + if (!connector.tableOperations().exists(childTableName)) { + log.info("Creating table: " + childTableName); + connector.tableOperations().create(childTableName); + log.info("Created table: " + childTableName); + log.info("Granting authorizations to table: " + childTableName); + SecurityOperations secOps = connector.securityOperations(); + secOps.grantTablePermission(userName, childTableName, TablePermission.WRITE); + log.info("Granted authorizations to table: " + childTableName); + + Authorizations parentAuths = secOps.getUserAuthorizations(userName); + // Add child authorizations so the temp parent table can be accessed. + if (!parentAuths.equals(childAuthorizations)) { + List<String> childAuthList = findUniqueAuthsFromChild(parentAuths.toString(), childAuthorizations.toString()); + tempChildAuths = Joiner.on(",").join(childAuthList); + log.info("Adding the authorization, \"" + tempChildAuths + "\", to the parent user, \"" + userName + "\""); + Authorizations newAuths = AccumuloRyaUtils.addUserAuths(userName, secOps, new Authorizations(tempChildAuths)); + secOps.changeUserAuthorizations(userName, newAuths); + } + } + } catch (TableExistsException | AccumuloException | AccumuloSecurityException e) { + throw new IOException(e); + } + } + + /** + * Gets any unique user auths that the child has that the parent does not. + * @param parentAuths the comma-separated string of parent authorizations. + * @param childAuths the comma-separated string of parent authorizations. + * @return the unique child authorizations that are not in the parent. + */ + private static List<String> findUniqueAuthsFromChild(String parentAuths, String childAuths) { + List<String> parentAuthList = AccumuloRyaUtils.convertAuthStringToList(parentAuths); + List<String> childAuthList = AccumuloRyaUtils.convertAuthStringToList(childAuths); + + childAuthList.removeAll(parentAuthList); + + return childAuthList; + } + + /** + * Imports the child files that hold the table data into the parent instance as a temporary table. + * @param childTableName the name of the child table to import into a temporary parent table. + * @throws Exception + */ + public void importChildFilesToTempParentTable(String childTableName) throws Exception { + // Create a temporary table in the parent instance to import the child files to. Then run the merge process on the parent table and temp child table. + String tempChildTable = childTableName + TEMP_SUFFIX; + + createTempTableIfNeeded(tempChildTable); + + AccumuloRdfConfiguration parentAccumuloRdfConfiguration = new AccumuloRdfConfiguration(conf); + parentAccumuloRdfConfiguration.setTablePrefix(childTablePrefix); + Connector parentConnector = AccumuloRyaUtils.setupConnector(parentAccumuloRdfConfiguration); + TableOperations parentTableOperations = parentConnector.tableOperations(); + + Path localWorkDir = CopyTool.getPath(localMergeFileImportDir, childTableName); + Path hdfsBaseWorkDir = CopyTool.getPath(baseImportDir, childTableName); + + CopyTool.copyLocalToHdfs(localWorkDir, hdfsBaseWorkDir, conf); + + Path files = CopyTool.getPath(hdfsBaseWorkDir.toString(), "files"); + Path failures = CopyTool.getPath(hdfsBaseWorkDir.toString(), "failures"); + FileSystem fs = FileSystem.get(conf); + // With HDFS permissions on, we need to make sure the Accumulo user can read/move the files + fs.setPermission(hdfsBaseWorkDir, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)); + if (fs.exists(failures)) { + fs.delete(failures, true); + } + fs.mkdirs(failures); + + parentTableOperations.importDirectory(tempChildTable, files.toString(), failures.toString(), false); + + AccumuloRyaUtils.printTablePretty(tempChildTable, conf); + } + + /** + * Copies all the relevant parent instance config properties to the corresponding child properties. + * @param config the {@link Configuration} to use. + */ + public static void copyParentPropertiesToChild(Configuration config) { + // Copy the parent properties for the child to use. + copyParentPropToChild(config, MRUtils.AC_MOCK_PROP); + copyParentPropToChild(config, MRUtils.AC_INSTANCE_PROP); + copyParentPropToChild(config, MRUtils.AC_USERNAME_PROP); + copyParentPropToChild(config, MRUtils.AC_PWD_PROP); + //copyParentPropToChild(config, MRUtils.TABLE_PREFIX_PROPERTY); + //copyParentPropToChild(config, MRUtils.AC_AUTH_PROP); + //copyParentPropToChild(config, RdfCloudTripleStoreConfiguration.CONF_QUERY_AUTH); + copyParentPropToChild(config, MRUtils.AC_ZK_PROP); + + MergeTool.setDuplicateKeys(config); + } + + /** + * Copies the parent config property to the corresponding child property. + * @param config the {@link Configuration} to use. + * @param parentPropertyName the parent property name to use. + */ + public static void copyParentPropToChild(Configuration config, String parentPropertyName) { + String parentValue = config.get(parentPropertyName, ""); + config.set(parentPropertyName + MergeTool.CHILD_SUFFIX, parentValue); + } + + @Override + protected void setupInputFormat(Job job) throws AccumuloSecurityException { + // set up accumulo input + if (!hdfsInput) { + job.setInputFormatClass(AccumuloInputFormat.class); + } else { + job.setInputFormatClass(AccumuloHDFSFileInputFormat.class); + } + AccumuloInputFormat.setConnectorInfo(job, userName, new PasswordToken(pwd)); + AccumuloInputFormat.setInputTableName(job, RdfCloudTripleStoreUtils.layoutPrefixToTable(rdfTableLayout, tablePrefix)); + AccumuloInputFormat.setScanAuthorizations(job, authorizations); + if (!mock) { + AccumuloInputFormat.setZooKeeperInstance(job, new ClientConfiguration().withInstance(instance).withZkHosts(zk)); + } else { + AccumuloInputFormat.setMockInstance(job, instance); + } + if (ttl != null) { + IteratorSetting setting = new IteratorSetting(1, "fi", AgeOffFilter.class); + AgeOffFilter.setTTL(setting, Long.valueOf(ttl)); + AccumuloInputFormat.addIterator(job, setting); + } + for (IteratorSetting iteratorSetting : AccumuloRyaUtils.COMMON_REG_EX_FILTER_SETTINGS) { + AccumuloInputFormat.addIterator(job, iteratorSetting); + } + } + + /** + * Sets up and runs the merge tool with the provided args. + * @param args the arguments list. + * @return the execution result. + */ + public static int setupAndRun(String[] args) { + int returnCode = -1; + try { + Configuration conf = new Configuration(); + Set<String> toolArgs = ToolConfigUtils.getUserArguments(conf, args); + if (!toolArgs.isEmpty()) { + String parameters = Joiner.on("\r\n\t").join(toolArgs); + log.info("Running Merge Tool with the following parameters...\r\n\t" + parameters); + } + + returnCode = ToolRunner.run(conf, new MergeTool(), args); + } catch (Exception e) { + log.error("Error running merge tool", e); + } + return returnCode; + } + + public static void main(String[] args) { + String log4jConfiguration = System.getProperties().getProperty("log4j.configuration"); + if (StringUtils.isNotBlank(log4jConfiguration)) { + String parsedConfiguration = StringUtils.removeStart(log4jConfiguration, "file:"); + File configFile = new File(parsedConfiguration); + if (configFile.exists()) { + DOMConfigurator.configure(parsedConfiguration); + } else { + BasicConfigurator.configure(); + } + } + log.info("Starting Merge Tool"); + + Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread thread, Throwable throwable) { + log.error("Uncaught exception in " + thread.getName(), throwable); + } + }); + + int returnCode = setupAndRun(args); + + log.info("Finished running Merge Tool"); + + System.exit(returnCode); + } + + /** + * Creates a formatted string for the start time based on the specified date and whether the dialog is to be displayed. + * @param startDate the start {@link Date} to format. + * @param isStartTimeDialogEnabled {@code true} to display the time dialog instead of using the date. {@code false} + * to use the provided {@code startDate}. + * @return the formatted start time string or {@code "dialog"}. + */ + public static String getStartTimeString(Date startDate, boolean isStartTimeDialogEnabled) { + String startTimeString; + if (isStartTimeDialogEnabled) { + startTimeString = USE_START_TIME_DIALOG; // set start date from dialog box + } else { + startTimeString = convertDateToStartTimeString(startDate); + } + return startTimeString; + } + + /** + * Converts the specified date into a string to use as the start time for the timestamp filter. + * @param date the start {@link Date} of the filter that will be formatted as a string. + * @return the formatted start time string. + */ + public static String convertDateToStartTimeString(Date date) { + String startTimeString = START_TIME_FORMATTER.format(date); + return startTimeString; + } + + /** + * Converts the specified string into a date to use as the start time for the timestamp filter. + * @param startTimeString the formatted time string. + * @return the start {@link Date}. + */ + public static Date convertStartTimeStringToDate(String startTimeString) { + Date date; + try { + date = START_TIME_FORMATTER.parse(startTimeString); + } catch (ParseException e) { + log.error("Could not parse date", e); + return null; + } + return date; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/71b5b1c9/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/common/InstanceType.java ---------------------------------------------------------------------- diff --git a/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/common/InstanceType.java b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/common/InstanceType.java new file mode 100644 index 0000000..d976384 --- /dev/null +++ b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/common/InstanceType.java @@ -0,0 +1,56 @@ +package mvm.rya.accumulo.mr.merge.common; + +/* + * #%L + * mvm.rya.accumulo.mr.merge + * %% + * Copyright (C) 2014 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.accumulo.minicluster.MiniAccumuloCluster; + +/** + * The type of Accumulo instance. + */ +public enum InstanceType { + /** + * An Accumulo instance that runs using a regular Accumulo distribution. + */ + DISTRIBUTION, + /** + * An Accumulo instance that runs using a {@link MiniAccumuloCluster}. + */ + MINI, + /** + * An Accumulo instance that runs using a {@link MockInstance}. + */ + MOCK; + + /** + * Finds the instance type by name. + * @param name the name to find. + * @return the {@link InstanceType} or {@code null} if none could be found. + */ + public static InstanceType fromName(String name) { + for (InstanceType instanceType : InstanceType.values()) { + if (instanceType.toString().equals(name)) { + return instanceType; + } + } + return null; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/71b5b1c9/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/gui/DateTimePickerDialog.java ---------------------------------------------------------------------- diff --git a/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/gui/DateTimePickerDialog.java b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/gui/DateTimePickerDialog.java new file mode 100644 index 0000000..4bf004f --- /dev/null +++ b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/gui/DateTimePickerDialog.java @@ -0,0 +1,173 @@ +package mvm.rya.accumulo.mr.merge.gui; + +/* + * #%L + * mvm.rya.accumulo.mr.merge + * %% + * Copyright (C) 2014 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.awt.GridBagConstraints; +import java.awt.GridBagLayout; +import java.awt.Insets; +import java.awt.event.ActionEvent; +import java.awt.event.ActionListener; +import java.util.Calendar; +import java.util.Date; + +import javax.swing.BorderFactory; +import javax.swing.JButton; +import javax.swing.JDialog; +import javax.swing.JLabel; +import javax.swing.JPanel; +import javax.swing.JSpinner; +import javax.swing.JSpinner.DateEditor; +import javax.swing.SpinnerDateModel; +import javax.swing.SpinnerModel; + +import com.toedter.calendar.JCalendar; + +/** + * Dialog for picking date and time. + */ +public class DateTimePickerDialog extends JDialog { + private static final long serialVersionUID = 1L; + + private JCalendar dateChooser; + private JSpinner timeSpinner; + + private Date selectedDateTime; + private JLabel label; + + + /** + * Creates a new instance of {@link DateTimePickerDialog}. + * @param title the title to display up top. + * @param message the message to display. + */ + public DateTimePickerDialog(String title, String message) { + this(null, title, message); + } + + /** + * Creates a new instance of {@link DateTimePickerDialog}. + * @param date the initial date to have the dialog show. + * @param title the title to display up top. + * @param message the message to display. + */ + public DateTimePickerDialog(Date date, String title, String message) { + // Create a modal dialog + super((JDialog) null); + setTitle(title); + setModalityType(ModalityType.APPLICATION_MODAL); + setType(Type.NORMAL); + + setLayout(new GridBagLayout()); + setDefaultCloseOperation(JDialog.DISPOSE_ON_CLOSE); + + JButton okButton = new JButton("OK"); + okButton.addActionListener (new ActionListener() { + @Override + public void actionPerformed(ActionEvent event) { + selectedDateTime = findSelectedDateTime(); + + // Hide dialog + setVisible(false); + } + }); + + getRootPane().setDefaultButton(okButton); + + JPanel dateTimePanel = buildDateTimePanel(date); + label = new JLabel (message); + label.setBorder(BorderFactory.createEtchedBorder()); + + GridBagConstraints c = new GridBagConstraints(); + c.fill = GridBagConstraints.HORIZONTAL; + c.insets = new Insets(5, 5, 5, 5); + c.gridx = 0; + c.gridy = 0; + + add(dateTimePanel, c); + c.gridy++; + add(label, c); + c.anchor = GridBagConstraints.EAST; + c.fill = GridBagConstraints.NONE; + c.gridy++; + add(okButton, c); + + pack(); + } + + private JPanel buildDateTimePanel(Date date) { + JPanel datePanel = new JPanel(); + + dateChooser = new JCalendar(); + if (date != null) { + Calendar calendar = Calendar.getInstance(); + calendar.setTime(date); + dateChooser.setCalendar(calendar); + } + + datePanel.add(dateChooser); + + SpinnerModel model = new SpinnerDateModel(); + timeSpinner = new JSpinner(model); + DateEditor editor = new DateEditor(timeSpinner, "HH:mm:ss"); + timeSpinner.setEditor(editor); + if (date != null) { + timeSpinner.setValue(date); + } + + datePanel.add(timeSpinner); + + return datePanel; + } + + private Date findSelectedDateTime() { + // Get the values from the date chooser + int day = dateChooser.getDayChooser().getDay(); + int month = dateChooser.getMonthChooser().getMonth(); + int year = dateChooser.getYearChooser().getYear(); + + // Get the values from the time chooser + Calendar timeCalendar = Calendar.getInstance(); + timeCalendar.setTime((Date) timeSpinner.getValue()); + int hour = timeCalendar.get(Calendar.HOUR_OF_DAY); + int minute = timeCalendar.get(Calendar.MINUTE); + int second = timeCalendar.get(Calendar.SECOND); + + // Combine these values into a single date object + Calendar newCalendar = Calendar.getInstance(); + newCalendar.set(Calendar.YEAR, year); + newCalendar.set(Calendar.MONTH, month); + newCalendar.set(Calendar.DATE, day); + newCalendar.set(Calendar.HOUR_OF_DAY, hour); + newCalendar.set(Calendar.MINUTE, minute); + newCalendar.set(Calendar.SECOND, second); + + Date newDate = newCalendar.getTime(); + + return newDate; + } + + /** + * @return the selected date time. + */ + public Date getSelectedDateTime() { + return selectedDateTime; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/71b5b1c9/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/mappers/AccumuloCopyToolMapper.java ---------------------------------------------------------------------- diff --git a/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/mappers/AccumuloCopyToolMapper.java b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/mappers/AccumuloCopyToolMapper.java new file mode 100644 index 0000000..c530b9e --- /dev/null +++ b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/mappers/AccumuloCopyToolMapper.java @@ -0,0 +1,52 @@ +package mvm.rya.accumulo.mr.merge.mappers; + +/* + * #%L + * mvm.rya.accumulo.mr.merge + * %% + * Copyright (C) 2014 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.IOException; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.hadoop.io.Text; + +/** + * Extended {@link BaseCopyToolMapper} that handles the {@code AccumuloOutputFormat} for the copy tool. + */ +public class AccumuloCopyToolMapper extends BaseCopyToolMapper<Key, Value, Text, Mutation> { + /** + * Creates a new {@link AccumuloCopyToolMapper}. + */ + public AccumuloCopyToolMapper() { + } + + @Override + protected void map(Key key, Value value, Context context) throws IOException, InterruptedException { + //log.trace("Mapping key: " + key + " = " + value); + Mutation mutation = makeAddMutation(key, value); + context.write(childTableNameText, mutation); + } + + private static Mutation makeAddMutation(Key key, Value value) { + Mutation mutation = new Mutation(key.getRow().getBytes()); + mutation.put(key.getColumnFamily(), key.getColumnQualifier(), key.getColumnVisibilityParsed(), key.getTimestamp(), value); + return mutation; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/71b5b1c9/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/mappers/AccumuloRyaRuleMapper.java ---------------------------------------------------------------------- diff --git a/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/mappers/AccumuloRyaRuleMapper.java b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/mappers/AccumuloRyaRuleMapper.java new file mode 100644 index 0000000..68850c6 --- /dev/null +++ b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/mappers/AccumuloRyaRuleMapper.java @@ -0,0 +1,69 @@ +package mvm.rya.accumulo.mr.merge.mappers; + +/* + * #%L + * mvm.rya.accumulo.mr.merge + * %% + * Copyright (C) 2014 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.IOException; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.hadoop.io.Text; + +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.persist.RyaDAOException; + +/** + * Rule mapper that inserts any copied statements into a child Accumulo Rya instance, and outputs + * any raw rows as <Text, Mutation> pairs. + */ +public class AccumuloRyaRuleMapper extends BaseRuleMapper<Text, Mutation> { + /** + * Insert a statement into the child Accumulo instance via the child DAO. + * @param rstmt RyaStatement to add to the child + * @param context Map context, not used + * @throws IOException if the DAO encounters an error adding the statement to Accumulo + */ + @Override + protected void copyStatement(RyaStatement rstmt, Context context) throws IOException { + try { + childDao.add(rstmt); + } + catch (RyaDAOException e) { + throw new IOException("Error inserting statement into child Rya DAO", e); + } + } + + /** + * Output a row via the Hadoop framework (rather than a Rya interface) to the Mapper's configured + * child table. + * @param key Row's key + * @param value Row's value + * @param context Context to use for writing + * @throws InterruptedException if the framework is interrupted writing output + * @throws IOException if the framework encounters an error writing the row to Accumulo + */ + @Override + protected void copyRow(Key key, Value value, Context context) throws IOException, InterruptedException { + Mutation mutation = new Mutation(key.getRow().getBytes()); + mutation.put(key.getColumnFamily(), key.getColumnQualifier(), key.getColumnVisibilityParsed(), key.getTimestamp(), value); + context.write(childTableNameText, mutation); + } +}
