Repository: incubator-rya
Updated Branches:
  refs/heads/master 5867d545d -> 5015f5945


RYA-172 Closes #89.  Fixed CopyTool unit test from not clearing tmp hadoop 
directory between tests.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/5015f594
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/5015f594
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/5015f594

Branch: refs/heads/master
Commit: 5015f5945ad5ead011dc2c222904b61de803d10d
Parents: 5867d54
Author: ejwhite922 <[email protected]>
Authored: Fri Sep 9 16:34:32 2016 -0400
Committer: pujav65 <[email protected]>
Committed: Tue Sep 27 11:40:43 2016 -0400

----------------------------------------------------------------------
 .../mvm/rya/accumulo/mr/merge/CopyTool.java     | 247 ++++++++++---------
 1 file changed, 128 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5015f594/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/CopyTool.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/CopyTool.java 
b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/CopyTool.java
index 196863b..7314451 100644
--- a/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/CopyTool.java
+++ b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/CopyTool.java
@@ -67,6 +67,7 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsShell;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -251,34 +252,34 @@ public class CopyTool extends 
AbstractDualInstanceAccumuloMRTool {
             // Display start time dialog if requested
             if (MergeTool.USE_START_TIME_DIALOG.equals(startTime)) {
                 log.info("Select start time from dialog...");
-                DateTimePickerDialog dateTimePickerDialog = new 
DateTimePickerDialog(DIALOG_TITLE, DIALOG_MESSAGE);
+                final DateTimePickerDialog dateTimePickerDialog = new 
DateTimePickerDialog(DIALOG_TITLE, DIALOG_MESSAGE);
                 dateTimePickerDialog.setVisible(true);
 
-                Date date = dateTimePickerDialog.getSelectedDateTime();
+                final Date date = dateTimePickerDialog.getSelectedDateTime();
                 startTime = MergeTool.START_TIME_FORMATTER.format(date);
                 conf.set(MergeTool.START_TIME_PROP, startTime);
                 log.info("Will copy all data after " + date);
             } else if (startTime != null) {
                 try {
-                    Date date = 
MergeTool.START_TIME_FORMATTER.parse(startTime);
+                    final Date date = 
MergeTool.START_TIME_FORMATTER.parse(startTime);
                     log.info("Will copy all data after " + date);
-                } catch (ParseException e) {
+                } catch (final ParseException e) {
                     throw new Exception("Unable to parse the provided start 
time: " + startTime, e);
                 }
             }
 
             Date copyRunTime = new Date();
-            boolean useTimeSync = conf.getBoolean(USE_NTP_SERVER_PROP, false);
+            final boolean useTimeSync = conf.getBoolean(USE_NTP_SERVER_PROP, 
false);
             if (useTimeSync) {
-                String tomcatUrl = conf.get(PARENT_TOMCAT_URL_PROP, null);
-                String ntpServerHost = conf.get(NTP_SERVER_HOST_PROP, null);
+                final String tomcatUrl = conf.get(PARENT_TOMCAT_URL_PROP, 
null);
+                final String ntpServerHost = conf.get(NTP_SERVER_HOST_PROP, 
null);
                 Long timeOffset = null;
                 Date ntpDate = null;
                 try {
                     log.info("Comparing parent machine's time to NTP server 
time...");
                     ntpDate = TimeUtils.getNtpServerDate(ntpServerHost);
-                    Date parentMachineDate = 
TimeUtils.getMachineDate(tomcatUrl);
-                    boolean isMachineLocal = 
TimeUtils.isUrlLocalMachine(tomcatUrl);
+                    final Date parentMachineDate = 
TimeUtils.getMachineDate(tomcatUrl);
+                    final boolean isMachineLocal = 
TimeUtils.isUrlLocalMachine(tomcatUrl);
                     timeOffset = TimeUtils.getTimeDifference(ntpDate, 
parentMachineDate, isMachineLocal);
                 } catch (IOException | ParseException e) {
                     throw new Exception("Unable to get time difference between 
machine and NTP server.", e);
@@ -288,7 +289,7 @@ public class CopyTool extends 
AbstractDualInstanceAccumuloMRTool {
                 }
                 copyRunTime = ntpDate;
             }
-            String copyRunTimeString = 
MergeTool.START_TIME_FORMATTER.format(copyRunTime);
+            final String copyRunTimeString = 
MergeTool.START_TIME_FORMATTER.format(copyRunTime);
             if (copyRunTime != null) {
                 conf.set(COPY_RUN_TIME_PROP, copyRunTimeString);
             }
@@ -296,14 +297,14 @@ public class CopyTool extends 
AbstractDualInstanceAccumuloMRTool {
 
         MergeTool.setDuplicateKeys(conf);
 
-        String copyTableListProperty = conf.get(COPY_TABLE_LIST_PROP);
+        final String copyTableListProperty = conf.get(COPY_TABLE_LIST_PROP);
         if (StringUtils.isNotBlank(copyTableListProperty)) {
             // Copy the tables specified in the config
-            String[] split = copyTableListProperty.split(",");
+            final String[] split = copyTableListProperty.split(",");
             tables.addAll(Arrays.asList(split));
         } else if (useCopyFileImport) {
-            File importDir = new File(localCopyFileImportDir);
-            String[] files = importDir.list();
+            final File importDir = new File(localCopyFileImportDir);
+            final String[] files = importDir.list();
             tables.addAll(Arrays.asList(files));
         } else {
             // By default copy all tables
@@ -325,13 +326,13 @@ public class CopyTool extends 
AbstractDualInstanceAccumuloMRTool {
         if (tables.isEmpty()) {
             log.warn("No list of tables to copy was provided.");
         } else {
-            String tablesToCopy = Joiner.on("\r\n\t").join(tables);
+            final String tablesToCopy = Joiner.on("\r\n\t").join(tables);
             log.info("Will attempt to copy the following tables/indices from 
the parent:\r\n\t" + tablesToCopy);
         }
     }
 
     @Override
-    public int run(String[] strings) throws Exception {
+    public int run(final String[] strings) throws Exception {
         useCopyFileImport = conf.getBoolean(USE_COPY_FILE_IMPORT, false);
         useQuery = conf.getBoolean(USE_COPY_QUERY_SPARQL, false);
 
@@ -353,21 +354,21 @@ public class CopyTool extends 
AbstractDualInstanceAccumuloMRTool {
             createChildInstance(conf);
         }
 
-        AccumuloRdfConfiguration parentAccumuloRdfConfiguration = new 
AccumuloRdfConfiguration(conf);
+        final AccumuloRdfConfiguration parentAccumuloRdfConfiguration = new 
AccumuloRdfConfiguration(conf);
         parentAccumuloRdfConfiguration.setTablePrefix(tablePrefix);
-        Connector parentConnector = 
AccumuloRyaUtils.setupConnector(parentAccumuloRdfConfiguration);
-        TableOperations parentTableOperations = 
parentConnector.tableOperations();
+        final Connector parentConnector = 
AccumuloRyaUtils.setupConnector(parentAccumuloRdfConfiguration);
+        final TableOperations parentTableOperations = 
parentConnector.tableOperations();
 
-        for (String table : tables) {
+        for (final String table : tables) {
             // Check if the parent table exists before creating a job on it
             if (parentTableOperations.exists(table)) {
-                String childTable = table.replaceFirst(tablePrefix, 
childTablePrefix);
-                String jobName = "Copy Tool, copying Parent Table: " + table + 
", into Child Table: " + childTable + ", " + System.currentTimeMillis();
+                final String childTable = table.replaceFirst(tablePrefix, 
childTablePrefix);
+                final String jobName = "Copy Tool, copying Parent Table: " + 
table + ", into Child Table: " + childTable + ", " + System.currentTimeMillis();
                 log.info("Initializing job: " + jobName);
                 conf.set(MRUtils.JOB_NAME_PROP, jobName);
                 conf.set(MergeTool.TABLE_NAME_PROP, table);
 
-                Job job = Job.getInstance(conf);
+                final Job job = Job.getInstance(conf);
                 job.setJarByClass(CopyTool.class);
 
                 setupAccumuloInput(job);
@@ -399,21 +400,21 @@ public class CopyTool extends 
AbstractDualInstanceAccumuloMRTool {
                 job.setReducerClass(Reducer.class);
 
                 // Submit the job
-                Date beginTime = new Date();
+                final Date beginTime = new Date();
                 log.info("Job for table \"" + table + "\" started: " + 
beginTime);
-                int exitCode = job.waitForCompletion(true) ? 0 : 1;
+                final int exitCode = job.waitForCompletion(true) ? 0 : 1;
 
                 if (exitCode == 0) {
                     if (useCopyFileOutput) {
                         log.info("Moving data from HDFS to the local file 
system for the table: " + childTable);
-                        Path hdfsPath = getPath(baseOutputDir, childTable);
-                        Path localPath = getPath(localBaseOutputDir, 
childTable);
+                        final Path hdfsPath = getPath(baseOutputDir, 
childTable);
+                        final Path localPath = getPath(localBaseOutputDir, 
childTable);
                         log.info("HDFS directory: " + hdfsPath.toString());
                         log.info("Local directory: " + localPath.toString());
                         copyHdfsToLocal(hdfsPath, localPath);
                     }
 
-                    Date endTime = new Date();
+                    final Date endTime = new Date();
                     log.info("Job for table \"" + table + "\" finished: " + 
endTime);
                     log.info("The job took " + (endTime.getTime() - 
beginTime.getTime()) / 1000 + " seconds.");
                 } else {
@@ -435,19 +436,19 @@ public class CopyTool extends 
AbstractDualInstanceAccumuloMRTool {
 
         createChildInstance(conf);
 
-        for (String childTable : tables) {
-            String jobName = "Copy Tool, importing Exported Parent Table files 
from: " + getPath(localCopyFileImportDir, childTable).toString() + ", into 
Child Table: " + childTable + ", " + System.currentTimeMillis();
+        for (final String childTable : tables) {
+            final String jobName = "Copy Tool, importing Exported Parent Table 
files from: " + getPath(localCopyFileImportDir, childTable).toString() + ", 
into Child Table: " + childTable + ", " + System.currentTimeMillis();
             log.info("Initializing job: " + jobName);
             conf.set(MRUtils.JOB_NAME_PROP, jobName);
 
             // Submit the job
-            Date beginTime = new Date();
+            final Date beginTime = new Date();
             log.info("Job for table \"" + childTable + "\" started: " + 
beginTime);
 
             createTableIfNeeded(childTable);
             importFilesToChildTable(childTable);
 
-            Date endTime = new Date();
+            final Date endTime = new Date();
             log.info("Job for table \"" + childTable + "\" finished: " + 
endTime);
             log.info("The job took " + (endTime.getTime() - 
beginTime.getTime()) / 1000 + " seconds.");
         }
@@ -463,7 +464,7 @@ public class CopyTool extends 
AbstractDualInstanceAccumuloMRTool {
         }
 
         // Set up the configuration
-        AccumuloRdfConfiguration aconf = new AccumuloRdfConfiguration(conf);
+        final AccumuloRdfConfiguration aconf = new 
AccumuloRdfConfiguration(conf);
         aconf.setBoolean(ConfigUtils.USE_MOCK_INSTANCE, mock);
         aconf.setTablePrefix(tablePrefix);
         aconf.setFlush(false);
@@ -471,7 +472,7 @@ public class CopyTool extends 
AbstractDualInstanceAccumuloMRTool {
 
         // Since we're copying at the statement-level, ignore any given list 
of tables and determine
         // which tables we might need to create based on which indexers are 
desired.
-        TablePrefixLayoutStrategy prefixStrategy = new 
TablePrefixLayoutStrategy(tablePrefix);
+        final TablePrefixLayoutStrategy prefixStrategy = new 
TablePrefixLayoutStrategy(tablePrefix);
         tables.clear();
         // Always include core tables
         tables.add(prefixStrategy.getSpo());
@@ -498,14 +499,14 @@ public class CopyTool extends 
AbstractDualInstanceAccumuloMRTool {
         // Ignore anything else, e.g. statistics -- must be recalculated for 
the child if desired
 
         // Extract the ruleset, and copy the namespace table directly
-        AccumuloQueryRuleset ruleset = new AccumuloQueryRuleset(aconf);
+        final AccumuloQueryRuleset ruleset = new AccumuloQueryRuleset(aconf);
         ruleset.addTable(prefixStrategy.getNs());
-        for (String line : ruleset.toString().split("\n")) {
+        for (final String line : ruleset.toString().split("\n")) {
             log.info(line);
         }
 
         // Create a Job and configure its input and output
-        Job job = Job.getInstance(aconf);
+        final Job job = Job.getInstance(aconf);
         job.setJarByClass(this.getClass());
         setupMultiTableInputFormat(job, ruleset);
         setupAccumuloOutput(job, "");
@@ -534,32 +535,32 @@ public class CopyTool extends 
AbstractDualInstanceAccumuloMRTool {
             job.setOutputValueClass(Mutation.class);
             job.setNumReduceTasks(0);
             // Create the child tables, so mappers don't try to do this in 
parallel
-            for (String parentTable : tables) {
-                String childTable = parentTable.replaceFirst(tablePrefix, 
childTablePrefix);
+            for (final String parentTable : tables) {
+                final String childTable = 
parentTable.replaceFirst(tablePrefix, childTablePrefix);
                 createTableIfNeeded(childTable);
             }
         }
 
         // Run the job and copy files to local filesystem if needed
-        Date beginTime = new Date();
+        final Date beginTime = new Date();
         log.info("Job started: " + beginTime);
-        boolean success = job.waitForCompletion(true);
+        final boolean success = job.waitForCompletion(true);
         if (success) {
             if (useCopyFileOutput) {
                 log.info("Moving data from HDFS to the local file system");
-                Path baseOutputPath = new Path(baseOutputDir);
-                for (FileStatus status : 
FileSystem.get(conf).listStatus(baseOutputPath)) {
+                final Path baseOutputPath = new Path(baseOutputDir);
+                for (final FileStatus status : 
FileSystem.get(conf).listStatus(baseOutputPath)) {
                     if (status.isDirectory()) {
-                        String tableName = status.getPath().getName();
-                        Path hdfsPath = getPath(baseOutputDir, tableName);
-                        Path localPath = getPath(localBaseOutputDir, 
tableName);
+                        final String tableName = status.getPath().getName();
+                        final Path hdfsPath = getPath(baseOutputDir, 
tableName);
+                        final Path localPath = getPath(localBaseOutputDir, 
tableName);
                         log.info("HDFS directory: " + hdfsPath.toString());
                         log.info("Local directory: " + localPath.toString());
                         copyHdfsToLocal(hdfsPath, localPath);
                     }
                 }
             }
-            Date endTime = new Date();
+            final Date endTime = new Date();
             log.info("Job finished: " + endTime);
             log.info("The job took " + (endTime.getTime() - 
beginTime.getTime()) / 1000 + " seconds.");
             return 0;
@@ -574,12 +575,12 @@ public class CopyTool extends 
AbstractDualInstanceAccumuloMRTool {
      * @param childTableName the name of the child table.
      * @throws IOException
      */
-    public void createTableIfNeeded(String childTableName) throws IOException {
+    public void createTableIfNeeded(final String childTableName) throws 
IOException {
         try {
-            Configuration childConfig = MergeToolMapper.getChildConfig(conf);
-            AccumuloRdfConfiguration childAccumuloRdfConfiguration = new 
AccumuloRdfConfiguration(childConfig);
+            final Configuration childConfig = 
MergeToolMapper.getChildConfig(conf);
+            final AccumuloRdfConfiguration childAccumuloRdfConfiguration = new 
AccumuloRdfConfiguration(childConfig);
             childAccumuloRdfConfiguration.setTablePrefix(childTablePrefix);
-            Connector childConnector = 
AccumuloRyaUtils.setupConnector(childAccumuloRdfConfiguration);
+            final Connector childConnector = 
AccumuloRyaUtils.setupConnector(childAccumuloRdfConfiguration);
             if (!childConnector.tableOperations().exists(childTableName)) {
                 log.info("Creating table: " + childTableName);
                 childConnector.tableOperations().create(childTableName);
@@ -593,21 +594,21 @@ public class CopyTool extends 
AbstractDualInstanceAccumuloMRTool {
         }
     }
 
-    private void setupSplitsFile(Job job, TableOperations 
parentTableOperations, String parentTableName, String childTableName) throws 
Exception {
-        FileSystem fs = FileSystem.get(conf);
+    private void setupSplitsFile(final Job job, final TableOperations 
parentTableOperations, final String parentTableName, final String 
childTableName) throws Exception {
+        final FileSystem fs = FileSystem.get(conf);
         fs.setPermission(getPath(baseOutputDir, childTableName), new 
FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
-        Path splitsPath = getPath(baseOutputDir, childTableName, "splits.txt");
-        Collection<Text> splits = 
parentTableOperations.listSplits(parentTableName, 100);
+        final Path splitsPath = getPath(baseOutputDir, childTableName, 
"splits.txt");
+        final Collection<Text> splits = 
parentTableOperations.listSplits(parentTableName, 100);
         log.info("Creating splits file at: " + splitsPath);
         try (PrintStream out = new PrintStream(new 
BufferedOutputStream(fs.create(splitsPath)))) {
-            for (Text split : splits) {
-                String encoded = new 
String(Base64.encodeBase64(TextUtil.getBytes(split)));
+            for (final Text split : splits) {
+                final String encoded = new 
String(Base64.encodeBase64(TextUtil.getBytes(split)));
                 out.println(encoded);
             }
         }
         fs.setPermission(splitsPath, new FsPermission(FsAction.ALL, 
FsAction.ALL, FsAction.ALL));
 
-        String userDir = System.getProperty("user.dir");
+        final String userDir = System.getProperty("user.dir");
         // The splits file has a symlink created in the user directory for 
some reason.
         // It might be better to copy the entire file for Windows but it 
doesn't seem to matter if
         // the user directory symlink is broken.
@@ -625,10 +626,10 @@ public class CopyTool extends 
AbstractDualInstanceAccumuloMRTool {
      * @param more Additional strings to be joined to form the path string.
      * @return the resulting {@link org.apache.hadoop.fs.Path}.
      */
-    public static Path getPath(String first, String... more) {
-        java.nio.file.Path path = Paths.get(first, more);
-        String stringPath = 
FilenameUtils.separatorsToUnix(path.toAbsolutePath().toString());
-        Path hadoopPath = new Path(stringPath);
+    public static Path getPath(final String first, final String... more) {
+        final java.nio.file.Path path = Paths.get(first, more);
+        final String stringPath = 
FilenameUtils.separatorsToUnix(path.toAbsolutePath().toString());
+        final Path hadoopPath = new Path(stringPath);
         return hadoopPath;
     }
 
@@ -637,23 +638,31 @@ public class CopyTool extends 
AbstractDualInstanceAccumuloMRTool {
      * @param childTableName the name of the child table to import.
      * @throws Exception
      */
-    public void importFilesToChildTable(String childTableName) throws 
Exception {
-        Configuration childConfig = MergeToolMapper.getChildConfig(conf);
-        AccumuloRdfConfiguration childAccumuloRdfConfiguration = new 
AccumuloRdfConfiguration(childConfig);
+    public void importFilesToChildTable(final String childTableName) throws 
Exception {
+        final Configuration childConfig = MergeToolMapper.getChildConfig(conf);
+        final AccumuloRdfConfiguration childAccumuloRdfConfiguration = new 
AccumuloRdfConfiguration(childConfig);
         childAccumuloRdfConfiguration.setTablePrefix(childTablePrefix);
-        Connector childConnector = 
AccumuloRyaUtils.setupConnector(childAccumuloRdfConfiguration);
-        TableOperations childTableOperations = 
childConnector.tableOperations();
+        final Connector childConnector = 
AccumuloRyaUtils.setupConnector(childAccumuloRdfConfiguration);
+        final TableOperations childTableOperations = 
childConnector.tableOperations();
 
-        Path localWorkDir = getPath(localCopyFileImportDir, childTableName);
-        Path hdfsBaseWorkDir = getPath(baseImportDir, childTableName);
+        final Path localWorkDir = getPath(localCopyFileImportDir, 
childTableName);
+        final Path hdfsBaseWorkDir = getPath(baseImportDir, childTableName);
 
+        final FileSystem fs = FileSystem.get(conf);
+        if (fs.exists(hdfsBaseWorkDir)) {
+            fs.delete(hdfsBaseWorkDir, true);
+        }
+
+        log.info("Importing from the local directory: " + localWorkDir);
+        log.info("Importing to the HDFS directory: " + hdfsBaseWorkDir);
         copyLocalToHdfs(localWorkDir, hdfsBaseWorkDir);
 
-        Path files = getPath(hdfsBaseWorkDir.toString(), "files");
-        Path failures = getPath(hdfsBaseWorkDir.toString(), "failures");
-        FileSystem fs = FileSystem.get(conf);
+        final Path files = getPath(hdfsBaseWorkDir.toString(), "files");
+        final Path failures = getPath(hdfsBaseWorkDir.toString(), "failures");
+
         // With HDFS permissions on, we need to make sure the Accumulo user 
can read/move the files
-        fs.setPermission(hdfsBaseWorkDir, new FsPermission(FsAction.ALL, 
FsAction.ALL, FsAction.ALL));
+        final FsShell shell = new FsShell(conf);
+        shell.run(new String[] {"-chmod", "777", hdfsBaseWorkDir.toString()});
         if (fs.exists(failures)) {
             fs.delete(failures, true);
         }
@@ -668,7 +677,7 @@ public class CopyTool extends 
AbstractDualInstanceAccumuloMRTool {
      * @param hdfsOutputPath the HDFS output {@link Path}.
      * @throws IOException
      */
-    public void copyLocalToHdfs(Path localInputPath, Path hdfsOutputPath) 
throws IOException {
+    public void copyLocalToHdfs(final Path localInputPath, final Path 
hdfsOutputPath) throws IOException {
         copyLocalToHdfs(localInputPath, hdfsOutputPath, conf);
     }
 
@@ -679,8 +688,8 @@ public class CopyTool extends 
AbstractDualInstanceAccumuloMRTool {
      * @param configuration the {@link Configuration} to use.
      * @throws IOException
      */
-    public static void copyLocalToHdfs(Path localInputPath, Path 
hdfsOutputPath, Configuration configuration) throws IOException {
-        FileSystem fs = FileSystem.get(configuration);
+    public static void copyLocalToHdfs(final Path localInputPath, final Path 
hdfsOutputPath, final Configuration configuration) throws IOException {
+        final FileSystem fs = FileSystem.get(configuration);
         fs.copyFromLocalFile(localInputPath, hdfsOutputPath);
     }
 
@@ -690,7 +699,7 @@ public class CopyTool extends 
AbstractDualInstanceAccumuloMRTool {
      * @param localOutputPath the local system output {@link Path}.
      * @throws IOException
      */
-    public void copyHdfsToLocal(Path hdfsInputPath, Path localOutputPath) 
throws IOException {
+    public void copyHdfsToLocal(final Path hdfsInputPath, final Path 
localOutputPath) throws IOException {
         copyHdfsToLocal(hdfsInputPath, localOutputPath, conf);
     }
 
@@ -701,17 +710,17 @@ public class CopyTool extends 
AbstractDualInstanceAccumuloMRTool {
      * @param configuration the {@link Configuration} to use.
      * @throws IOException
      */
-    public static void copyHdfsToLocal(Path hdfsInputPath, Path 
localOutputPath, Configuration configuration) throws IOException {
-        FileSystem fs = FileSystem.get(configuration);
+    public static void copyHdfsToLocal(final Path hdfsInputPath, final Path 
localOutputPath, final Configuration configuration) throws IOException {
+        final FileSystem fs = FileSystem.get(configuration);
         fs.copyToLocalFile(hdfsInputPath, localOutputPath);
     }
 
     @Override
-    protected void setupAccumuloInput(Job job) throws 
AccumuloSecurityException {
+    protected void setupAccumuloInput(final Job job) throws 
AccumuloSecurityException {
         if (useCopyFileImport) {
             try {
                 AccumuloHDFSFileInputFormat.setInputPaths(job, 
localCopyFileImportDir);
-            } catch (IOException e) {
+            } catch (final IOException e) {
                 log.error("Failed to set copy file import directory", e);
             }
         } else {
@@ -730,15 +739,15 @@ public class CopyTool extends 
AbstractDualInstanceAccumuloMRTool {
                 AccumuloInputFormat.setMockInstance(job, instance);
             }
             if (ttl != null) {
-                IteratorSetting setting = new IteratorSetting(1, "fi", 
AgeOffFilter.class);
+                final IteratorSetting setting = new IteratorSetting(1, "fi", 
AgeOffFilter.class);
                 AgeOffFilter.setTTL(setting, Long.valueOf(ttl));
                 AccumuloInputFormat.addIterator(job, setting);
             }
             if (startTime != null) {
-                IteratorSetting setting = getStartTimeSetting(startTime);
+                final IteratorSetting setting = getStartTimeSetting(startTime);
                 AccumuloInputFormat.addIterator(job, setting);
             }
-            for (IteratorSetting iteratorSetting : 
AccumuloRyaUtils.COMMON_REG_EX_FILTER_SETTINGS) {
+            for (final IteratorSetting iteratorSetting : 
AccumuloRyaUtils.COMMON_REG_EX_FILTER_SETTINGS) {
                 AccumuloInputFormat.addIterator(job, iteratorSetting);
             }
         }
@@ -749,7 +758,7 @@ public class CopyTool extends 
AbstractDualInstanceAccumuloMRTool {
      * @param job The Job to configure
      * @param rules The ruleset mapping a query to the appropriate tables and 
ranges
      */
-    protected void setupMultiTableInputFormat(Job job, AccumuloQueryRuleset 
rules) throws AccumuloSecurityException {
+    protected void setupMultiTableInputFormat(final Job job, final 
AccumuloQueryRuleset rules) throws AccumuloSecurityException {
         AbstractInputFormat.setConnectorInfo(job, userName, new 
PasswordToken(pwd));
         AbstractInputFormat.setScanAuthorizations(job, authorizations);
         if (!mock) {
@@ -757,20 +766,20 @@ public class CopyTool extends 
AbstractDualInstanceAccumuloMRTool {
         } else {
             AbstractInputFormat.setMockInstance(job, instance);
         }
-        Map<String, InputTableConfig> configs = rules.getInputConfigs();
+        final Map<String, InputTableConfig> configs = rules.getInputConfigs();
         // Add any relevant iterator settings
-        List<IteratorSetting> additionalSettings = new 
LinkedList<>(AccumuloRyaUtils.COMMON_REG_EX_FILTER_SETTINGS);
+        final List<IteratorSetting> additionalSettings = new 
LinkedList<>(AccumuloRyaUtils.COMMON_REG_EX_FILTER_SETTINGS);
         if (ttl != null) {
-            IteratorSetting ttlSetting = new IteratorSetting(1, "fi", 
AgeOffFilter.class);
+            final IteratorSetting ttlSetting = new IteratorSetting(1, "fi", 
AgeOffFilter.class);
             AgeOffFilter.setTTL(ttlSetting, Long.valueOf(ttl));
             additionalSettings.add(ttlSetting);
         }
         if (startTime != null) {
-            IteratorSetting startTimeSetting = getStartTimeSetting(startTime);
+            final IteratorSetting startTimeSetting = 
getStartTimeSetting(startTime);
             additionalSettings.add(startTimeSetting);
         }
-        for (Map.Entry<String, InputTableConfig> entry : configs.entrySet()) {
-            List<IteratorSetting> iterators = entry.getValue().getIterators();
+        for (final Map.Entry<String, InputTableConfig> entry : 
configs.entrySet()) {
+            final List<IteratorSetting> iterators = 
entry.getValue().getIterators();
             iterators.addAll(additionalSettings);
             entry.getValue().setIterators(iterators);
         }
@@ -780,7 +789,7 @@ public class CopyTool extends 
AbstractDualInstanceAccumuloMRTool {
     }
 
     @Override
-    protected void setupAccumuloOutput(Job job, String outputTable) throws 
AccumuloSecurityException {
+    protected void setupAccumuloOutput(final Job job, final String 
outputTable) throws AccumuloSecurityException {
         AccumuloOutputFormat.setConnectorInfo(job, childUserName, new 
PasswordToken(childPwd));
         AccumuloOutputFormat.setCreateTables(job, true);
         AccumuloOutputFormat.setDefaultTableName(job, outputTable);
@@ -812,15 +821,15 @@ public class CopyTool extends 
AbstractDualInstanceAccumuloMRTool {
                 if (useCopyFileOutputDirectoryClear) {
                     try {
                         clearOutputDir(baseOutputPath);
-                    } catch (IOException e) {
+                    } catch (final IOException e) {
                         log.error("Error clearing out output path.", e);
                     }
                 }
                 try {
-                    FileSystem fs = FileSystem.get(conf);
+                    final FileSystem fs = FileSystem.get(conf);
                     fs.mkdirs(filesOutputPath.getParent());
                     fs.setPermission(filesOutputPath.getParent(), new 
FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
-                } catch (IOException e) {
+                } catch (final IOException e) {
                     log.error("Failed to set permission for output path.", e);
                 }
                 AccumuloFileOutputFormat.setOutputPath(job, filesOutputPath);
@@ -845,28 +854,28 @@ public class CopyTool extends 
AbstractDualInstanceAccumuloMRTool {
      * @param args the arguments list.
      * @return the execution result.
      */
-    public int setupAndRun(String[] args) {
+    public int setupAndRun(final String[] args) {
         int returnCode = -1;
         try {
-            Configuration conf = new Configuration();
-            Set<String> toolArgs = ToolConfigUtils.getUserArguments(conf, 
args);
+            final Configuration conf = new Configuration();
+            final Set<String> toolArgs = 
ToolConfigUtils.getUserArguments(conf, args);
             if (!toolArgs.isEmpty()) {
-                String parameters = Joiner.on("\r\n\t").join(toolArgs);
+                final String parameters = Joiner.on("\r\n\t").join(toolArgs);
                 log.info("Running Copy Tool with the following 
parameters...\r\n\t" + parameters);
             }
 
             returnCode = ToolRunner.run(conf, this, args);
-        } catch (Exception e) {
+        } catch (final Exception e) {
             log.error("Error running copy tool", e);
         }
         return returnCode;
     }
 
-    public static void main(String[] args) {
-        String log4jConfiguration = 
System.getProperties().getProperty("log4j.configuration");
+    public static void main(final String[] args) {
+        final String log4jConfiguration = 
System.getProperties().getProperty("log4j.configuration");
         if (StringUtils.isNotBlank(log4jConfiguration)) {
-            String parsedConfiguration = 
StringUtils.removeStart(log4jConfiguration, "file:");
-            File configFile = new File(parsedConfiguration);
+            final String parsedConfiguration = 
StringUtils.removeStart(log4jConfiguration, "file:");
+            final File configFile = new File(parsedConfiguration);
             if (configFile.exists()) {
                 DOMConfigurator.configure(parsedConfiguration);
             } else {
@@ -877,13 +886,13 @@ public class CopyTool extends 
AbstractDualInstanceAccumuloMRTool {
 
         Thread.setDefaultUncaughtExceptionHandler(new 
Thread.UncaughtExceptionHandler() {
             @Override
-            public void uncaughtException(Thread thread, Throwable throwable) {
+            public void uncaughtException(final Thread thread, final Throwable 
throwable) {
                 log.error("Uncaught exception in " + thread.getName(), 
throwable);
             }
         });
 
-        CopyTool copyTool = new CopyTool();
-        int returnCode = copyTool.setupAndRun(args);
+        final CopyTool copyTool = new CopyTool();
+        final int returnCode = copyTool.setupAndRun(args);
 
         log.info("Finished running Copy Tool");
 
@@ -895,11 +904,11 @@ public class CopyTool extends 
AbstractDualInstanceAccumuloMRTool {
      * @param startTimeString the start time of the filter.
      * @return the {@link IteratorSetting}.
      */
-    public static IteratorSetting getStartTimeSetting(String startTimeString) {
+    public static IteratorSetting getStartTimeSetting(final String 
startTimeString) {
         Date date = null;
         try {
             date = MergeTool.START_TIME_FORMATTER.parse(startTimeString);
-        } catch (ParseException e) {
+        } catch (final ParseException e) {
             throw new IllegalArgumentException("Couldn't parse " + 
startTimeString, e);
         }
         return getStartTimeSetting(date);
@@ -910,7 +919,7 @@ public class CopyTool extends 
AbstractDualInstanceAccumuloMRTool {
      * @param date the start {@link Date} of the filter.
      * @return the {@link IteratorSetting}.
      */
-    public static IteratorSetting getStartTimeSetting(Date date) {
+    public static IteratorSetting getStartTimeSetting(final Date date) {
         return getStartTimeSetting(date.getTime());
     }
 
@@ -919,8 +928,8 @@ public class CopyTool extends 
AbstractDualInstanceAccumuloMRTool {
      * @param time the start time of the filter.
      * @return the {@link IteratorSetting}.
      */
-    public static IteratorSetting getStartTimeSetting(long time) {
-        IteratorSetting setting = new IteratorSetting(1, "startTimeIterator", 
TimestampFilter.class);
+    public static IteratorSetting getStartTimeSetting(final long time) {
+        final IteratorSetting setting = new IteratorSetting(1, 
"startTimeIterator", TimestampFilter.class);
         TimestampFilter.setStart(setting, time, true);
         TimestampFilter.setEnd(setting, Long.MAX_VALUE, true);
         return setting;
@@ -933,8 +942,8 @@ public class CopyTool extends 
AbstractDualInstanceAccumuloMRTool {
      * @return {@code true} if the compression type is one of "none", "gz", 
"lzo", or "snappy".
      * {@code false} otherwise.
      */
-    private static boolean isValidCompressionType(String compressionType) {
-        for (Algorithm algorithm : Algorithm.values()) {
+    private static boolean isValidCompressionType(final String 
compressionType) {
+        for (final Algorithm algorithm : Algorithm.values()) {
             if (algorithm.getName().equals(compressionType)) {
                 return true;
             }
@@ -942,22 +951,22 @@ public class CopyTool extends 
AbstractDualInstanceAccumuloMRTool {
         return false;
     }
 
-    private void clearOutputDir(Path path) throws IOException {
-        FileSystem fs = FileSystem.get(conf);
+    private void clearOutputDir(final Path path) throws IOException {
+        final FileSystem fs = FileSystem.get(conf);
         fs.delete(path, true);
     }
 
-    private Instance createChildInstance(Configuration config) throws 
Exception {
+    private Instance createChildInstance(final Configuration config) throws 
Exception {
         Instance instance = null;
         String instanceTypeProp = config.get(CREATE_CHILD_INSTANCE_TYPE_PROP);
-        String childAuth = config.get(MRUtils.AC_AUTH_PROP + 
MergeTool.CHILD_SUFFIX);
+        final String childAuth = config.get(MRUtils.AC_AUTH_PROP + 
MergeTool.CHILD_SUFFIX);
 
         // Default to distribution cluster if not specified
         if (StringUtils.isBlank(instanceTypeProp)) {
             instanceTypeProp = InstanceType.DISTRIBUTION.toString();
         }
 
-        InstanceType instanceType = InstanceType.fromName(instanceTypeProp);
+        final InstanceType instanceType = 
InstanceType.fromName(instanceTypeProp);
         switch (instanceType) {
             case DISTRIBUTION:
                 if (childInstance == null) {

Reply via email to