Repository: incubator-rya Updated Branches: refs/heads/master 5867d545d -> 5015f5945
RYA-172 Closes #89. Fixed CopyTool unit test from not clearing tmp hadoop directory between tests. Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/5015f594 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/5015f594 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/5015f594 Branch: refs/heads/master Commit: 5015f5945ad5ead011dc2c222904b61de803d10d Parents: 5867d54 Author: ejwhite922 <[email protected]> Authored: Fri Sep 9 16:34:32 2016 -0400 Committer: pujav65 <[email protected]> Committed: Tue Sep 27 11:40:43 2016 -0400 ---------------------------------------------------------------------- .../mvm/rya/accumulo/mr/merge/CopyTool.java | 247 ++++++++++--------- 1 file changed, 128 insertions(+), 119 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5015f594/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/CopyTool.java ---------------------------------------------------------------------- diff --git a/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/CopyTool.java b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/CopyTool.java index 196863b..7314451 100644 --- a/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/CopyTool.java +++ b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/CopyTool.java @@ -67,6 +67,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FsShell; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; @@ -251,34 +252,34 @@ public class CopyTool extends AbstractDualInstanceAccumuloMRTool { // Display start time dialog if requested if (MergeTool.USE_START_TIME_DIALOG.equals(startTime)) { log.info("Select start time from dialog..."); - DateTimePickerDialog dateTimePickerDialog = new DateTimePickerDialog(DIALOG_TITLE, DIALOG_MESSAGE); + final DateTimePickerDialog dateTimePickerDialog = new DateTimePickerDialog(DIALOG_TITLE, DIALOG_MESSAGE); dateTimePickerDialog.setVisible(true); - Date date = dateTimePickerDialog.getSelectedDateTime(); + final Date date = dateTimePickerDialog.getSelectedDateTime(); startTime = MergeTool.START_TIME_FORMATTER.format(date); conf.set(MergeTool.START_TIME_PROP, startTime); log.info("Will copy all data after " + date); } else if (startTime != null) { try { - Date date = MergeTool.START_TIME_FORMATTER.parse(startTime); + final Date date = MergeTool.START_TIME_FORMATTER.parse(startTime); log.info("Will copy all data after " + date); - } catch (ParseException e) { + } catch (final ParseException e) { throw new Exception("Unable to parse the provided start time: " + startTime, e); } } Date copyRunTime = new Date(); - boolean useTimeSync = conf.getBoolean(USE_NTP_SERVER_PROP, false); + final boolean useTimeSync = conf.getBoolean(USE_NTP_SERVER_PROP, false); if (useTimeSync) { - String tomcatUrl = conf.get(PARENT_TOMCAT_URL_PROP, null); - String ntpServerHost = conf.get(NTP_SERVER_HOST_PROP, null); + final String tomcatUrl = conf.get(PARENT_TOMCAT_URL_PROP, null); + final String ntpServerHost = conf.get(NTP_SERVER_HOST_PROP, null); Long timeOffset = null; Date ntpDate = null; try { log.info("Comparing parent machine's time to NTP server time..."); ntpDate = TimeUtils.getNtpServerDate(ntpServerHost); - Date parentMachineDate = TimeUtils.getMachineDate(tomcatUrl); - boolean isMachineLocal = TimeUtils.isUrlLocalMachine(tomcatUrl); + final Date parentMachineDate = TimeUtils.getMachineDate(tomcatUrl); + final boolean isMachineLocal = TimeUtils.isUrlLocalMachine(tomcatUrl); timeOffset = TimeUtils.getTimeDifference(ntpDate, parentMachineDate, isMachineLocal); } catch (IOException | ParseException e) { throw new Exception("Unable to get time difference between machine and NTP server.", e); @@ -288,7 +289,7 @@ public class CopyTool extends AbstractDualInstanceAccumuloMRTool { } copyRunTime = ntpDate; } - String copyRunTimeString = MergeTool.START_TIME_FORMATTER.format(copyRunTime); + final String copyRunTimeString = MergeTool.START_TIME_FORMATTER.format(copyRunTime); if (copyRunTime != null) { conf.set(COPY_RUN_TIME_PROP, copyRunTimeString); } @@ -296,14 +297,14 @@ public class CopyTool extends AbstractDualInstanceAccumuloMRTool { MergeTool.setDuplicateKeys(conf); - String copyTableListProperty = conf.get(COPY_TABLE_LIST_PROP); + final String copyTableListProperty = conf.get(COPY_TABLE_LIST_PROP); if (StringUtils.isNotBlank(copyTableListProperty)) { // Copy the tables specified in the config - String[] split = copyTableListProperty.split(","); + final String[] split = copyTableListProperty.split(","); tables.addAll(Arrays.asList(split)); } else if (useCopyFileImport) { - File importDir = new File(localCopyFileImportDir); - String[] files = importDir.list(); + final File importDir = new File(localCopyFileImportDir); + final String[] files = importDir.list(); tables.addAll(Arrays.asList(files)); } else { // By default copy all tables @@ -325,13 +326,13 @@ public class CopyTool extends AbstractDualInstanceAccumuloMRTool { if (tables.isEmpty()) { log.warn("No list of tables to copy was provided."); } else { - String tablesToCopy = Joiner.on("\r\n\t").join(tables); + final String tablesToCopy = Joiner.on("\r\n\t").join(tables); log.info("Will attempt to copy the following tables/indices from the parent:\r\n\t" + tablesToCopy); } } @Override - public int run(String[] strings) throws Exception { + public int run(final String[] strings) throws Exception { useCopyFileImport = conf.getBoolean(USE_COPY_FILE_IMPORT, false); useQuery = conf.getBoolean(USE_COPY_QUERY_SPARQL, false); @@ -353,21 +354,21 @@ public class CopyTool extends AbstractDualInstanceAccumuloMRTool { createChildInstance(conf); } - AccumuloRdfConfiguration parentAccumuloRdfConfiguration = new AccumuloRdfConfiguration(conf); + final AccumuloRdfConfiguration parentAccumuloRdfConfiguration = new AccumuloRdfConfiguration(conf); parentAccumuloRdfConfiguration.setTablePrefix(tablePrefix); - Connector parentConnector = AccumuloRyaUtils.setupConnector(parentAccumuloRdfConfiguration); - TableOperations parentTableOperations = parentConnector.tableOperations(); + final Connector parentConnector = AccumuloRyaUtils.setupConnector(parentAccumuloRdfConfiguration); + final TableOperations parentTableOperations = parentConnector.tableOperations(); - for (String table : tables) { + for (final String table : tables) { // Check if the parent table exists before creating a job on it if (parentTableOperations.exists(table)) { - String childTable = table.replaceFirst(tablePrefix, childTablePrefix); - String jobName = "Copy Tool, copying Parent Table: " + table + ", into Child Table: " + childTable + ", " + System.currentTimeMillis(); + final String childTable = table.replaceFirst(tablePrefix, childTablePrefix); + final String jobName = "Copy Tool, copying Parent Table: " + table + ", into Child Table: " + childTable + ", " + System.currentTimeMillis(); log.info("Initializing job: " + jobName); conf.set(MRUtils.JOB_NAME_PROP, jobName); conf.set(MergeTool.TABLE_NAME_PROP, table); - Job job = Job.getInstance(conf); + final Job job = Job.getInstance(conf); job.setJarByClass(CopyTool.class); setupAccumuloInput(job); @@ -399,21 +400,21 @@ public class CopyTool extends AbstractDualInstanceAccumuloMRTool { job.setReducerClass(Reducer.class); // Submit the job - Date beginTime = new Date(); + final Date beginTime = new Date(); log.info("Job for table \"" + table + "\" started: " + beginTime); - int exitCode = job.waitForCompletion(true) ? 0 : 1; + final int exitCode = job.waitForCompletion(true) ? 0 : 1; if (exitCode == 0) { if (useCopyFileOutput) { log.info("Moving data from HDFS to the local file system for the table: " + childTable); - Path hdfsPath = getPath(baseOutputDir, childTable); - Path localPath = getPath(localBaseOutputDir, childTable); + final Path hdfsPath = getPath(baseOutputDir, childTable); + final Path localPath = getPath(localBaseOutputDir, childTable); log.info("HDFS directory: " + hdfsPath.toString()); log.info("Local directory: " + localPath.toString()); copyHdfsToLocal(hdfsPath, localPath); } - Date endTime = new Date(); + final Date endTime = new Date(); log.info("Job for table \"" + table + "\" finished: " + endTime); log.info("The job took " + (endTime.getTime() - beginTime.getTime()) / 1000 + " seconds."); } else { @@ -435,19 +436,19 @@ public class CopyTool extends AbstractDualInstanceAccumuloMRTool { createChildInstance(conf); - for (String childTable : tables) { - String jobName = "Copy Tool, importing Exported Parent Table files from: " + getPath(localCopyFileImportDir, childTable).toString() + ", into Child Table: " + childTable + ", " + System.currentTimeMillis(); + for (final String childTable : tables) { + final String jobName = "Copy Tool, importing Exported Parent Table files from: " + getPath(localCopyFileImportDir, childTable).toString() + ", into Child Table: " + childTable + ", " + System.currentTimeMillis(); log.info("Initializing job: " + jobName); conf.set(MRUtils.JOB_NAME_PROP, jobName); // Submit the job - Date beginTime = new Date(); + final Date beginTime = new Date(); log.info("Job for table \"" + childTable + "\" started: " + beginTime); createTableIfNeeded(childTable); importFilesToChildTable(childTable); - Date endTime = new Date(); + final Date endTime = new Date(); log.info("Job for table \"" + childTable + "\" finished: " + endTime); log.info("The job took " + (endTime.getTime() - beginTime.getTime()) / 1000 + " seconds."); } @@ -463,7 +464,7 @@ public class CopyTool extends AbstractDualInstanceAccumuloMRTool { } // Set up the configuration - AccumuloRdfConfiguration aconf = new AccumuloRdfConfiguration(conf); + final AccumuloRdfConfiguration aconf = new AccumuloRdfConfiguration(conf); aconf.setBoolean(ConfigUtils.USE_MOCK_INSTANCE, mock); aconf.setTablePrefix(tablePrefix); aconf.setFlush(false); @@ -471,7 +472,7 @@ public class CopyTool extends AbstractDualInstanceAccumuloMRTool { // Since we're copying at the statement-level, ignore any given list of tables and determine // which tables we might need to create based on which indexers are desired. - TablePrefixLayoutStrategy prefixStrategy = new TablePrefixLayoutStrategy(tablePrefix); + final TablePrefixLayoutStrategy prefixStrategy = new TablePrefixLayoutStrategy(tablePrefix); tables.clear(); // Always include core tables tables.add(prefixStrategy.getSpo()); @@ -498,14 +499,14 @@ public class CopyTool extends AbstractDualInstanceAccumuloMRTool { // Ignore anything else, e.g. statistics -- must be recalculated for the child if desired // Extract the ruleset, and copy the namespace table directly - AccumuloQueryRuleset ruleset = new AccumuloQueryRuleset(aconf); + final AccumuloQueryRuleset ruleset = new AccumuloQueryRuleset(aconf); ruleset.addTable(prefixStrategy.getNs()); - for (String line : ruleset.toString().split("\n")) { + for (final String line : ruleset.toString().split("\n")) { log.info(line); } // Create a Job and configure its input and output - Job job = Job.getInstance(aconf); + final Job job = Job.getInstance(aconf); job.setJarByClass(this.getClass()); setupMultiTableInputFormat(job, ruleset); setupAccumuloOutput(job, ""); @@ -534,32 +535,32 @@ public class CopyTool extends AbstractDualInstanceAccumuloMRTool { job.setOutputValueClass(Mutation.class); job.setNumReduceTasks(0); // Create the child tables, so mappers don't try to do this in parallel - for (String parentTable : tables) { - String childTable = parentTable.replaceFirst(tablePrefix, childTablePrefix); + for (final String parentTable : tables) { + final String childTable = parentTable.replaceFirst(tablePrefix, childTablePrefix); createTableIfNeeded(childTable); } } // Run the job and copy files to local filesystem if needed - Date beginTime = new Date(); + final Date beginTime = new Date(); log.info("Job started: " + beginTime); - boolean success = job.waitForCompletion(true); + final boolean success = job.waitForCompletion(true); if (success) { if (useCopyFileOutput) { log.info("Moving data from HDFS to the local file system"); - Path baseOutputPath = new Path(baseOutputDir); - for (FileStatus status : FileSystem.get(conf).listStatus(baseOutputPath)) { + final Path baseOutputPath = new Path(baseOutputDir); + for (final FileStatus status : FileSystem.get(conf).listStatus(baseOutputPath)) { if (status.isDirectory()) { - String tableName = status.getPath().getName(); - Path hdfsPath = getPath(baseOutputDir, tableName); - Path localPath = getPath(localBaseOutputDir, tableName); + final String tableName = status.getPath().getName(); + final Path hdfsPath = getPath(baseOutputDir, tableName); + final Path localPath = getPath(localBaseOutputDir, tableName); log.info("HDFS directory: " + hdfsPath.toString()); log.info("Local directory: " + localPath.toString()); copyHdfsToLocal(hdfsPath, localPath); } } } - Date endTime = new Date(); + final Date endTime = new Date(); log.info("Job finished: " + endTime); log.info("The job took " + (endTime.getTime() - beginTime.getTime()) / 1000 + " seconds."); return 0; @@ -574,12 +575,12 @@ public class CopyTool extends AbstractDualInstanceAccumuloMRTool { * @param childTableName the name of the child table. * @throws IOException */ - public void createTableIfNeeded(String childTableName) throws IOException { + public void createTableIfNeeded(final String childTableName) throws IOException { try { - Configuration childConfig = MergeToolMapper.getChildConfig(conf); - AccumuloRdfConfiguration childAccumuloRdfConfiguration = new AccumuloRdfConfiguration(childConfig); + final Configuration childConfig = MergeToolMapper.getChildConfig(conf); + final AccumuloRdfConfiguration childAccumuloRdfConfiguration = new AccumuloRdfConfiguration(childConfig); childAccumuloRdfConfiguration.setTablePrefix(childTablePrefix); - Connector childConnector = AccumuloRyaUtils.setupConnector(childAccumuloRdfConfiguration); + final Connector childConnector = AccumuloRyaUtils.setupConnector(childAccumuloRdfConfiguration); if (!childConnector.tableOperations().exists(childTableName)) { log.info("Creating table: " + childTableName); childConnector.tableOperations().create(childTableName); @@ -593,21 +594,21 @@ public class CopyTool extends AbstractDualInstanceAccumuloMRTool { } } - private void setupSplitsFile(Job job, TableOperations parentTableOperations, String parentTableName, String childTableName) throws Exception { - FileSystem fs = FileSystem.get(conf); + private void setupSplitsFile(final Job job, final TableOperations parentTableOperations, final String parentTableName, final String childTableName) throws Exception { + final FileSystem fs = FileSystem.get(conf); fs.setPermission(getPath(baseOutputDir, childTableName), new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)); - Path splitsPath = getPath(baseOutputDir, childTableName, "splits.txt"); - Collection<Text> splits = parentTableOperations.listSplits(parentTableName, 100); + final Path splitsPath = getPath(baseOutputDir, childTableName, "splits.txt"); + final Collection<Text> splits = parentTableOperations.listSplits(parentTableName, 100); log.info("Creating splits file at: " + splitsPath); try (PrintStream out = new PrintStream(new BufferedOutputStream(fs.create(splitsPath)))) { - for (Text split : splits) { - String encoded = new String(Base64.encodeBase64(TextUtil.getBytes(split))); + for (final Text split : splits) { + final String encoded = new String(Base64.encodeBase64(TextUtil.getBytes(split))); out.println(encoded); } } fs.setPermission(splitsPath, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)); - String userDir = System.getProperty("user.dir"); + final String userDir = System.getProperty("user.dir"); // The splits file has a symlink created in the user directory for some reason. // It might be better to copy the entire file for Windows but it doesn't seem to matter if // the user directory symlink is broken. @@ -625,10 +626,10 @@ public class CopyTool extends AbstractDualInstanceAccumuloMRTool { * @param more Additional strings to be joined to form the path string. * @return the resulting {@link org.apache.hadoop.fs.Path}. */ - public static Path getPath(String first, String... more) { - java.nio.file.Path path = Paths.get(first, more); - String stringPath = FilenameUtils.separatorsToUnix(path.toAbsolutePath().toString()); - Path hadoopPath = new Path(stringPath); + public static Path getPath(final String first, final String... more) { + final java.nio.file.Path path = Paths.get(first, more); + final String stringPath = FilenameUtils.separatorsToUnix(path.toAbsolutePath().toString()); + final Path hadoopPath = new Path(stringPath); return hadoopPath; } @@ -637,23 +638,31 @@ public class CopyTool extends AbstractDualInstanceAccumuloMRTool { * @param childTableName the name of the child table to import. * @throws Exception */ - public void importFilesToChildTable(String childTableName) throws Exception { - Configuration childConfig = MergeToolMapper.getChildConfig(conf); - AccumuloRdfConfiguration childAccumuloRdfConfiguration = new AccumuloRdfConfiguration(childConfig); + public void importFilesToChildTable(final String childTableName) throws Exception { + final Configuration childConfig = MergeToolMapper.getChildConfig(conf); + final AccumuloRdfConfiguration childAccumuloRdfConfiguration = new AccumuloRdfConfiguration(childConfig); childAccumuloRdfConfiguration.setTablePrefix(childTablePrefix); - Connector childConnector = AccumuloRyaUtils.setupConnector(childAccumuloRdfConfiguration); - TableOperations childTableOperations = childConnector.tableOperations(); + final Connector childConnector = AccumuloRyaUtils.setupConnector(childAccumuloRdfConfiguration); + final TableOperations childTableOperations = childConnector.tableOperations(); - Path localWorkDir = getPath(localCopyFileImportDir, childTableName); - Path hdfsBaseWorkDir = getPath(baseImportDir, childTableName); + final Path localWorkDir = getPath(localCopyFileImportDir, childTableName); + final Path hdfsBaseWorkDir = getPath(baseImportDir, childTableName); + final FileSystem fs = FileSystem.get(conf); + if (fs.exists(hdfsBaseWorkDir)) { + fs.delete(hdfsBaseWorkDir, true); + } + + log.info("Importing from the local directory: " + localWorkDir); + log.info("Importing to the HDFS directory: " + hdfsBaseWorkDir); copyLocalToHdfs(localWorkDir, hdfsBaseWorkDir); - Path files = getPath(hdfsBaseWorkDir.toString(), "files"); - Path failures = getPath(hdfsBaseWorkDir.toString(), "failures"); - FileSystem fs = FileSystem.get(conf); + final Path files = getPath(hdfsBaseWorkDir.toString(), "files"); + final Path failures = getPath(hdfsBaseWorkDir.toString(), "failures"); + // With HDFS permissions on, we need to make sure the Accumulo user can read/move the files - fs.setPermission(hdfsBaseWorkDir, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)); + final FsShell shell = new FsShell(conf); + shell.run(new String[] {"-chmod", "777", hdfsBaseWorkDir.toString()}); if (fs.exists(failures)) { fs.delete(failures, true); } @@ -668,7 +677,7 @@ public class CopyTool extends AbstractDualInstanceAccumuloMRTool { * @param hdfsOutputPath the HDFS output {@link Path}. * @throws IOException */ - public void copyLocalToHdfs(Path localInputPath, Path hdfsOutputPath) throws IOException { + public void copyLocalToHdfs(final Path localInputPath, final Path hdfsOutputPath) throws IOException { copyLocalToHdfs(localInputPath, hdfsOutputPath, conf); } @@ -679,8 +688,8 @@ public class CopyTool extends AbstractDualInstanceAccumuloMRTool { * @param configuration the {@link Configuration} to use. * @throws IOException */ - public static void copyLocalToHdfs(Path localInputPath, Path hdfsOutputPath, Configuration configuration) throws IOException { - FileSystem fs = FileSystem.get(configuration); + public static void copyLocalToHdfs(final Path localInputPath, final Path hdfsOutputPath, final Configuration configuration) throws IOException { + final FileSystem fs = FileSystem.get(configuration); fs.copyFromLocalFile(localInputPath, hdfsOutputPath); } @@ -690,7 +699,7 @@ public class CopyTool extends AbstractDualInstanceAccumuloMRTool { * @param localOutputPath the local system output {@link Path}. * @throws IOException */ - public void copyHdfsToLocal(Path hdfsInputPath, Path localOutputPath) throws IOException { + public void copyHdfsToLocal(final Path hdfsInputPath, final Path localOutputPath) throws IOException { copyHdfsToLocal(hdfsInputPath, localOutputPath, conf); } @@ -701,17 +710,17 @@ public class CopyTool extends AbstractDualInstanceAccumuloMRTool { * @param configuration the {@link Configuration} to use. * @throws IOException */ - public static void copyHdfsToLocal(Path hdfsInputPath, Path localOutputPath, Configuration configuration) throws IOException { - FileSystem fs = FileSystem.get(configuration); + public static void copyHdfsToLocal(final Path hdfsInputPath, final Path localOutputPath, final Configuration configuration) throws IOException { + final FileSystem fs = FileSystem.get(configuration); fs.copyToLocalFile(hdfsInputPath, localOutputPath); } @Override - protected void setupAccumuloInput(Job job) throws AccumuloSecurityException { + protected void setupAccumuloInput(final Job job) throws AccumuloSecurityException { if (useCopyFileImport) { try { AccumuloHDFSFileInputFormat.setInputPaths(job, localCopyFileImportDir); - } catch (IOException e) { + } catch (final IOException e) { log.error("Failed to set copy file import directory", e); } } else { @@ -730,15 +739,15 @@ public class CopyTool extends AbstractDualInstanceAccumuloMRTool { AccumuloInputFormat.setMockInstance(job, instance); } if (ttl != null) { - IteratorSetting setting = new IteratorSetting(1, "fi", AgeOffFilter.class); + final IteratorSetting setting = new IteratorSetting(1, "fi", AgeOffFilter.class); AgeOffFilter.setTTL(setting, Long.valueOf(ttl)); AccumuloInputFormat.addIterator(job, setting); } if (startTime != null) { - IteratorSetting setting = getStartTimeSetting(startTime); + final IteratorSetting setting = getStartTimeSetting(startTime); AccumuloInputFormat.addIterator(job, setting); } - for (IteratorSetting iteratorSetting : AccumuloRyaUtils.COMMON_REG_EX_FILTER_SETTINGS) { + for (final IteratorSetting iteratorSetting : AccumuloRyaUtils.COMMON_REG_EX_FILTER_SETTINGS) { AccumuloInputFormat.addIterator(job, iteratorSetting); } } @@ -749,7 +758,7 @@ public class CopyTool extends AbstractDualInstanceAccumuloMRTool { * @param job The Job to configure * @param rules The ruleset mapping a query to the appropriate tables and ranges */ - protected void setupMultiTableInputFormat(Job job, AccumuloQueryRuleset rules) throws AccumuloSecurityException { + protected void setupMultiTableInputFormat(final Job job, final AccumuloQueryRuleset rules) throws AccumuloSecurityException { AbstractInputFormat.setConnectorInfo(job, userName, new PasswordToken(pwd)); AbstractInputFormat.setScanAuthorizations(job, authorizations); if (!mock) { @@ -757,20 +766,20 @@ public class CopyTool extends AbstractDualInstanceAccumuloMRTool { } else { AbstractInputFormat.setMockInstance(job, instance); } - Map<String, InputTableConfig> configs = rules.getInputConfigs(); + final Map<String, InputTableConfig> configs = rules.getInputConfigs(); // Add any relevant iterator settings - List<IteratorSetting> additionalSettings = new LinkedList<>(AccumuloRyaUtils.COMMON_REG_EX_FILTER_SETTINGS); + final List<IteratorSetting> additionalSettings = new LinkedList<>(AccumuloRyaUtils.COMMON_REG_EX_FILTER_SETTINGS); if (ttl != null) { - IteratorSetting ttlSetting = new IteratorSetting(1, "fi", AgeOffFilter.class); + final IteratorSetting ttlSetting = new IteratorSetting(1, "fi", AgeOffFilter.class); AgeOffFilter.setTTL(ttlSetting, Long.valueOf(ttl)); additionalSettings.add(ttlSetting); } if (startTime != null) { - IteratorSetting startTimeSetting = getStartTimeSetting(startTime); + final IteratorSetting startTimeSetting = getStartTimeSetting(startTime); additionalSettings.add(startTimeSetting); } - for (Map.Entry<String, InputTableConfig> entry : configs.entrySet()) { - List<IteratorSetting> iterators = entry.getValue().getIterators(); + for (final Map.Entry<String, InputTableConfig> entry : configs.entrySet()) { + final List<IteratorSetting> iterators = entry.getValue().getIterators(); iterators.addAll(additionalSettings); entry.getValue().setIterators(iterators); } @@ -780,7 +789,7 @@ public class CopyTool extends AbstractDualInstanceAccumuloMRTool { } @Override - protected void setupAccumuloOutput(Job job, String outputTable) throws AccumuloSecurityException { + protected void setupAccumuloOutput(final Job job, final String outputTable) throws AccumuloSecurityException { AccumuloOutputFormat.setConnectorInfo(job, childUserName, new PasswordToken(childPwd)); AccumuloOutputFormat.setCreateTables(job, true); AccumuloOutputFormat.setDefaultTableName(job, outputTable); @@ -812,15 +821,15 @@ public class CopyTool extends AbstractDualInstanceAccumuloMRTool { if (useCopyFileOutputDirectoryClear) { try { clearOutputDir(baseOutputPath); - } catch (IOException e) { + } catch (final IOException e) { log.error("Error clearing out output path.", e); } } try { - FileSystem fs = FileSystem.get(conf); + final FileSystem fs = FileSystem.get(conf); fs.mkdirs(filesOutputPath.getParent()); fs.setPermission(filesOutputPath.getParent(), new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)); - } catch (IOException e) { + } catch (final IOException e) { log.error("Failed to set permission for output path.", e); } AccumuloFileOutputFormat.setOutputPath(job, filesOutputPath); @@ -845,28 +854,28 @@ public class CopyTool extends AbstractDualInstanceAccumuloMRTool { * @param args the arguments list. * @return the execution result. */ - public int setupAndRun(String[] args) { + public int setupAndRun(final String[] args) { int returnCode = -1; try { - Configuration conf = new Configuration(); - Set<String> toolArgs = ToolConfigUtils.getUserArguments(conf, args); + final Configuration conf = new Configuration(); + final Set<String> toolArgs = ToolConfigUtils.getUserArguments(conf, args); if (!toolArgs.isEmpty()) { - String parameters = Joiner.on("\r\n\t").join(toolArgs); + final String parameters = Joiner.on("\r\n\t").join(toolArgs); log.info("Running Copy Tool with the following parameters...\r\n\t" + parameters); } returnCode = ToolRunner.run(conf, this, args); - } catch (Exception e) { + } catch (final Exception e) { log.error("Error running copy tool", e); } return returnCode; } - public static void main(String[] args) { - String log4jConfiguration = System.getProperties().getProperty("log4j.configuration"); + public static void main(final String[] args) { + final String log4jConfiguration = System.getProperties().getProperty("log4j.configuration"); if (StringUtils.isNotBlank(log4jConfiguration)) { - String parsedConfiguration = StringUtils.removeStart(log4jConfiguration, "file:"); - File configFile = new File(parsedConfiguration); + final String parsedConfiguration = StringUtils.removeStart(log4jConfiguration, "file:"); + final File configFile = new File(parsedConfiguration); if (configFile.exists()) { DOMConfigurator.configure(parsedConfiguration); } else { @@ -877,13 +886,13 @@ public class CopyTool extends AbstractDualInstanceAccumuloMRTool { Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { @Override - public void uncaughtException(Thread thread, Throwable throwable) { + public void uncaughtException(final Thread thread, final Throwable throwable) { log.error("Uncaught exception in " + thread.getName(), throwable); } }); - CopyTool copyTool = new CopyTool(); - int returnCode = copyTool.setupAndRun(args); + final CopyTool copyTool = new CopyTool(); + final int returnCode = copyTool.setupAndRun(args); log.info("Finished running Copy Tool"); @@ -895,11 +904,11 @@ public class CopyTool extends AbstractDualInstanceAccumuloMRTool { * @param startTimeString the start time of the filter. * @return the {@link IteratorSetting}. */ - public static IteratorSetting getStartTimeSetting(String startTimeString) { + public static IteratorSetting getStartTimeSetting(final String startTimeString) { Date date = null; try { date = MergeTool.START_TIME_FORMATTER.parse(startTimeString); - } catch (ParseException e) { + } catch (final ParseException e) { throw new IllegalArgumentException("Couldn't parse " + startTimeString, e); } return getStartTimeSetting(date); @@ -910,7 +919,7 @@ public class CopyTool extends AbstractDualInstanceAccumuloMRTool { * @param date the start {@link Date} of the filter. * @return the {@link IteratorSetting}. */ - public static IteratorSetting getStartTimeSetting(Date date) { + public static IteratorSetting getStartTimeSetting(final Date date) { return getStartTimeSetting(date.getTime()); } @@ -919,8 +928,8 @@ public class CopyTool extends AbstractDualInstanceAccumuloMRTool { * @param time the start time of the filter. * @return the {@link IteratorSetting}. */ - public static IteratorSetting getStartTimeSetting(long time) { - IteratorSetting setting = new IteratorSetting(1, "startTimeIterator", TimestampFilter.class); + public static IteratorSetting getStartTimeSetting(final long time) { + final IteratorSetting setting = new IteratorSetting(1, "startTimeIterator", TimestampFilter.class); TimestampFilter.setStart(setting, time, true); TimestampFilter.setEnd(setting, Long.MAX_VALUE, true); return setting; @@ -933,8 +942,8 @@ public class CopyTool extends AbstractDualInstanceAccumuloMRTool { * @return {@code true} if the compression type is one of "none", "gz", "lzo", or "snappy". * {@code false} otherwise. */ - private static boolean isValidCompressionType(String compressionType) { - for (Algorithm algorithm : Algorithm.values()) { + private static boolean isValidCompressionType(final String compressionType) { + for (final Algorithm algorithm : Algorithm.values()) { if (algorithm.getName().equals(compressionType)) { return true; } @@ -942,22 +951,22 @@ public class CopyTool extends AbstractDualInstanceAccumuloMRTool { return false; } - private void clearOutputDir(Path path) throws IOException { - FileSystem fs = FileSystem.get(conf); + private void clearOutputDir(final Path path) throws IOException { + final FileSystem fs = FileSystem.get(conf); fs.delete(path, true); } - private Instance createChildInstance(Configuration config) throws Exception { + private Instance createChildInstance(final Configuration config) throws Exception { Instance instance = null; String instanceTypeProp = config.get(CREATE_CHILD_INSTANCE_TYPE_PROP); - String childAuth = config.get(MRUtils.AC_AUTH_PROP + MergeTool.CHILD_SUFFIX); + final String childAuth = config.get(MRUtils.AC_AUTH_PROP + MergeTool.CHILD_SUFFIX); // Default to distribution cluster if not specified if (StringUtils.isBlank(instanceTypeProp)) { instanceTypeProp = InstanceType.DISTRIBUTION.toString(); } - InstanceType instanceType = InstanceType.fromName(instanceTypeProp); + final InstanceType instanceType = InstanceType.fromName(instanceTypeProp); switch (instanceType) { case DISTRIBUTION: if (childInstance == null) {
