Repository: falcon Updated Branches: refs/heads/master 2b84dd56d -> ee644dd2a
FALCON-2017 Fix HiveDR extension issues Author: Sowmya Ramesh <[email protected]> Reviewers: "Venkat Ranganathan <[email protected]>, Balu Vellanki <[email protected]>" Closes #175 from sowmyaramesh/FALCON-2017 Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/ee644dd2 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/ee644dd2 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/ee644dd2 Branch: refs/heads/master Commit: ee644dd2a6639085b469f8fae96580671f4d0fe9 Parents: 2b84dd5 Author: Sowmya Ramesh <[email protected]> Authored: Tue Jun 7 14:06:52 2016 -0700 Committer: bvellanki <[email protected]> Committed: Tue Jun 7 14:06:52 2016 -0700 ---------------------------------------------------------------------- .../runtime/hive-mirroring-secure-workflow.xml | 48 ++++++++++---------- .../runtime/hive-mirroring-workflow.xml | 48 ++++++++++---------- .../java/org/apache/falcon/hive/HiveDRArgs.java | 7 +-- .../org/apache/falcon/hive/HiveDROptions.java | 37 ++++++++++----- .../java/org/apache/falcon/hive/HiveDRTool.java | 7 +++ .../falcon/hive/mapreduce/CopyMapper.java | 2 +- .../org/apache/falcon/hive/util/EventUtils.java | 23 ++++------ .../org/apache/falcon/hive/util/FileUtils.java | 4 +- .../java/org/apache/falcon/hive/DRTest.java | 4 +- .../mirroring/hive/HiveMirroringExtension.java | 17 +++++++ .../hive/HiveMirroringExtensionProperties.java | 5 +- 11 files changed, 120 insertions(+), 82 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/ee644dd2/addons/extensions/hive-mirroring/src/main/resources/runtime/hive-mirroring-secure-workflow.xml ---------------------------------------------------------------------- diff --git a/addons/extensions/hive-mirroring/src/main/resources/runtime/hive-mirroring-secure-workflow.xml b/addons/extensions/hive-mirroring/src/main/resources/runtime/hive-mirroring-secure-workflow.xml index 4bf048f..63e9a67 100644 --- a/addons/extensions/hive-mirroring/src/main/resources/runtime/hive-mirroring-secure-workflow.xml +++ b/addons/extensions/hive-mirroring/src/main/resources/runtime/hive-mirroring-secure-workflow.xml @@ -96,18 +96,16 @@ <main-class>org.apache.falcon.hive.HiveDRTool</main-class> <arg>-Dmapred.job.queue.name=${queueName}</arg> <arg>-Dmapred.job.priority=${jobPriority}</arg> - <arg>-falconLibPath</arg> - <arg>${wf:conf("falcon.libpath")}</arg> <arg>-sourceCluster</arg> <arg>${sourceCluster}</arg> <arg>-sourceMetastoreUri</arg> <arg>${sourceMetastoreUri}</arg> <arg>-sourceHiveServer2Uri</arg> <arg>${sourceHiveServer2Uri}</arg> - <arg>-sourceDatabase</arg> - <arg>${sourceDatabase}</arg> - <arg>-sourceTable</arg> - <arg>${sourceTable}</arg> + <arg>-sourceDatabases</arg> + <arg>${sourceDatabases}</arg> + <arg>-sourceTables</arg> + <arg>${sourceTables}</arg> <arg>-sourceStagingPath</arg> <arg>${sourceStagingPath}</arg> <arg>-sourceNN</arg> @@ -144,8 +142,10 @@ <arg>${clusterForJobNNKerberosPrincipal}</arg> <arg>-tdeEncryptionEnabled</arg> <arg>${tdeEncryptionEnabled}</arg> - <arg>-jobName</arg> - <arg>${jobName}-${nominalTime}</arg> + <arg>-hiveJobName</arg> + <arg>${hiveJobName}</arg> + <arg>-sourceDatabase</arg> + <arg>${sourceDatabase}</arg> <arg>-executionStage</arg> <arg>lastevents</arg> </java> @@ -190,8 +190,6 @@ <main-class>org.apache.falcon.hive.HiveDRTool</main-class> <arg>-Dmapred.job.queue.name=${queueName}</arg> <arg>-Dmapred.job.priority=${jobPriority}</arg> - <arg>-falconLibPath</arg> - <arg>${wf:conf("falcon.libpath")}</arg> <arg>-replicationMaxMaps</arg> <arg>${replicationMaxMaps}</arg> <arg>-distcpMaxMaps</arg> @@ -202,10 +200,10 @@ <arg>${sourceMetastoreUri}</arg> <arg>-sourceHiveServer2Uri</arg> <arg>${sourceHiveServer2Uri}</arg> - <arg>-sourceDatabase</arg> - <arg>${sourceDatabase}</arg> - <arg>-sourceTable</arg> - <arg>${sourceTable}</arg> + <arg>-sourceDatabases</arg> + <arg>${sourceDatabases}</arg> + <arg>-sourceTables</arg> + <arg>${sourceTables}</arg> <arg>-sourceStagingPath</arg> <arg>${sourceStagingPath}</arg> <arg>-sourceNN</arg> @@ -244,8 +242,10 @@ <arg>${clusterForJobNNKerberosPrincipal}</arg> <arg>-tdeEncryptionEnabled</arg> <arg>${tdeEncryptionEnabled}</arg> - <arg>-jobName</arg> - <arg>${jobName}-${nominalTime}</arg> + <arg>-hiveJobName</arg> + <arg>${hiveJobName}</arg> + <arg>-sourceDatabase</arg> + <arg>${sourceDatabase}</arg> <arg>-executionStage</arg> <arg>export</arg> <arg>-counterLogDir</arg> @@ -292,8 +292,6 @@ <main-class>org.apache.falcon.hive.HiveDRTool</main-class> <arg>-Dmapred.job.queue.name=${queueName}</arg> <arg>-Dmapred.job.priority=${jobPriority}</arg> - <arg>-falconLibPath</arg> - <arg>${wf:conf("falcon.libpath")}</arg> <arg>-replicationMaxMaps</arg> <arg>${replicationMaxMaps}</arg> <arg>-distcpMaxMaps</arg> @@ -304,10 +302,10 @@ <arg>${sourceMetastoreUri}</arg> <arg>-sourceHiveServer2Uri</arg> <arg>${sourceHiveServer2Uri}</arg> - <arg>-sourceDatabase</arg> - <arg>${sourceDatabase}</arg> - <arg>-sourceTable</arg> - <arg>${sourceTable}</arg> + <arg>-sourceDatabases</arg> + <arg>${sourceDatabases}</arg> + <arg>-sourceTables</arg> + <arg>${sourceTables}</arg> <arg>-sourceStagingPath</arg> <arg>${sourceStagingPath}</arg> <arg>-sourceNN</arg> @@ -346,8 +344,10 @@ <arg>${clusterForJobNNKerberosPrincipal}</arg> <arg>-tdeEncryptionEnabled</arg> <arg>${tdeEncryptionEnabled}</arg> - <arg>-jobName</arg> - <arg>${jobName}-${nominalTime}</arg> + <arg>-hiveJobName</arg> + <arg>${hiveJobName}</arg> + <arg>-sourceDatabase</arg> + <arg>${sourceDatabase}</arg> <arg>-executionStage</arg> <arg>import</arg> </java> http://git-wip-us.apache.org/repos/asf/falcon/blob/ee644dd2/addons/extensions/hive-mirroring/src/main/resources/runtime/hive-mirroring-workflow.xml ---------------------------------------------------------------------- diff --git a/addons/extensions/hive-mirroring/src/main/resources/runtime/hive-mirroring-workflow.xml b/addons/extensions/hive-mirroring/src/main/resources/runtime/hive-mirroring-workflow.xml index 9f9bf92..4f6eec5 100644 --- a/addons/extensions/hive-mirroring/src/main/resources/runtime/hive-mirroring-workflow.xml +++ b/addons/extensions/hive-mirroring/src/main/resources/runtime/hive-mirroring-workflow.xml @@ -46,18 +46,16 @@ <main-class>org.apache.falcon.hive.HiveDRTool</main-class> <arg>-Dmapred.job.queue.name=${queueName}</arg> <arg>-Dmapred.job.priority=${jobPriority}</arg> - <arg>-falconLibPath</arg> - <arg>${wf:conf("falcon.libpath")}</arg> <arg>-sourceCluster</arg> <arg>${sourceCluster}</arg> <arg>-sourceMetastoreUri</arg> <arg>${sourceMetastoreUri}</arg> <arg>-sourceHiveServer2Uri</arg> <arg>${sourceHiveServer2Uri}</arg> - <arg>-sourceDatabase</arg> - <arg>${sourceDatabase}</arg> - <arg>-sourceTable</arg> - <arg>${sourceTable}</arg> + <arg>-sourceDatabases</arg> + <arg>${sourceDatabases}</arg> + <arg>-sourceTables</arg> + <arg>${sourceTables}</arg> <arg>-sourceStagingPath</arg> <arg>${sourceStagingPath}</arg> <arg>-sourceNN</arg> @@ -80,8 +78,10 @@ <arg>${clusterForJobRunWriteEP}</arg> <arg>-tdeEncryptionEnabled</arg> <arg>${tdeEncryptionEnabled}</arg> - <arg>-jobName</arg> - <arg>${jobName}-${nominalTime}</arg> + <arg>-hiveJobName</arg> + <arg>${hiveJobName}</arg> + <arg>-sourceDatabase</arg> + <arg>${sourceDatabase}</arg> <arg>-executionStage</arg> <arg>lastevents</arg> </java> @@ -118,8 +118,6 @@ <main-class>org.apache.falcon.hive.HiveDRTool</main-class> <arg>-Dmapred.job.queue.name=${queueName}</arg> <arg>-Dmapred.job.priority=${jobPriority}</arg> - <arg>-falconLibPath</arg> - <arg>${wf:conf("falcon.libpath")}</arg> <arg>-replicationMaxMaps</arg> <arg>${replicationMaxMaps}</arg> <arg>-distcpMaxMaps</arg> @@ -130,10 +128,10 @@ <arg>${sourceMetastoreUri}</arg> <arg>-sourceHiveServer2Uri</arg> <arg>${sourceHiveServer2Uri}</arg> - <arg>-sourceDatabase</arg> - <arg>${sourceDatabase}</arg> - <arg>-sourceTable</arg> - <arg>${sourceTable}</arg> + <arg>-sourceDatabases</arg> + <arg>${sourceDatabases}</arg> + <arg>-sourceTables</arg> + <arg>${sourceTables}</arg> <arg>-sourceStagingPath</arg> <arg>${sourceStagingPath}</arg> <arg>-sourceNN</arg> @@ -158,8 +156,10 @@ <arg>${clusterForJobRunWriteEP}</arg> <arg>-tdeEncryptionEnabled</arg> <arg>${tdeEncryptionEnabled}</arg> - <arg>-jobName</arg> - <arg>${jobName}-${nominalTime}</arg> + <arg>-hiveJobName</arg> + <arg>${hiveJobName}</arg> + <arg>-sourceDatabase</arg> + <arg>${sourceDatabase}</arg> <arg>-executionStage</arg> <arg>export</arg> <arg>-counterLogDir</arg> @@ -198,8 +198,6 @@ <main-class>org.apache.falcon.hive.HiveDRTool</main-class> <arg>-Dmapred.job.queue.name=${queueName}</arg> <arg>-Dmapred.job.priority=${jobPriority}</arg> - <arg>-falconLibPath</arg> - <arg>${wf:conf("falcon.libpath")}</arg> <arg>-replicationMaxMaps</arg> <arg>${replicationMaxMaps}</arg> <arg>-distcpMaxMaps</arg> @@ -210,10 +208,10 @@ <arg>${sourceMetastoreUri}</arg> <arg>-sourceHiveServer2Uri</arg> <arg>${sourceHiveServer2Uri}</arg> - <arg>-sourceDatabase</arg> - <arg>${sourceDatabase}</arg> - <arg>-sourceTable</arg> - <arg>${sourceTable}</arg> + <arg>-sourceDatabases</arg> + <arg>${sourceDatabases}</arg> + <arg>-sourceTables</arg> + <arg>${sourceTables}</arg> <arg>-sourceStagingPath</arg> <arg>${sourceStagingPath}</arg> <arg>-sourceNN</arg> @@ -238,8 +236,10 @@ <arg>${clusterForJobRunWriteEP}</arg> <arg>-tdeEncryptionEnabled</arg> <arg>${tdeEncryptionEnabled}</arg> - <arg>-jobName</arg> - <arg>${jobName}-${nominalTime}</arg> + <arg>-hiveJobName</arg> + <arg>${hiveJobName}</arg> + <arg>-sourceDatabase</arg> + <arg>${sourceDatabase}</arg> <arg>-executionStage</arg> <arg>import</arg> </java> http://git-wip-us.apache.org/repos/asf/falcon/blob/ee644dd2/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java index 71b9043..d891487 100644 --- a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java +++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java @@ -30,8 +30,9 @@ public enum HiveDRArgs { SOURCE_CLUSTER("sourceCluster", "source cluster"), SOURCE_METASTORE_URI("sourceMetastoreUri", "source meta store uri"), SOURCE_HS2_URI("sourceHiveServer2Uri", "source HS2 uri"), - SOURCE_DATABASE("sourceDatabase", "comma source databases"), - SOURCE_TABLE("sourceTable", "comma source tables"), + SOURCE_DATABASES("sourceDatabases", "comma source databases"), + SOURCE_DATABASE("sourceDatabase", "First source database"), + SOURCE_TABLES("sourceTables", "comma source tables"), SOURCE_STAGING_PATH("sourceStagingPath", "source staging path for data", false), // source hadoop endpoints @@ -70,7 +71,7 @@ public enum HiveDRArgs { // Map Bandwidth DISTCP_MAP_BANDWIDTH("distcpMapBandwidth", "map bandwidth in mb", false), - JOB_NAME("jobName", "unique job name"), + JOB_NAME("hiveJobName", "unique job name"), CLUSTER_FOR_JOB_RUN("clusterForJobRun", "cluster where job runs"), JOB_CLUSTER_NN("clusterForJobRunWriteEP", "write end point of cluster where job runs"), http://git-wip-us.apache.org/repos/asf/falcon/blob/ee644dd2/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDROptions.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDROptions.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDROptions.java index 0096727..215be35 100644 --- a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDROptions.java +++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDROptions.java @@ -63,21 +63,29 @@ public class HiveDROptions { } public List<String> getSourceDatabases() { - return Arrays.asList(context.get(HiveDRArgs.SOURCE_DATABASE).trim().split(",")); + return Arrays.asList(context.get(HiveDRArgs.SOURCE_DATABASES).trim().split(",")); } public List<String> getSourceTables() { - return Arrays.asList(context.get(HiveDRArgs.SOURCE_TABLE).trim().split(",")); + return Arrays.asList(context.get(HiveDRArgs.SOURCE_TABLES).trim().split(",")); } public String getSourceStagingPath() { + return context.get(HiveDRArgs.SOURCE_STAGING_PATH); + } + + + public void setSourceStagingPath() { String stagingPath = context.get(HiveDRArgs.SOURCE_STAGING_PATH); - if (StringUtils.isNotBlank(stagingPath)) { - stagingPath = StringUtils.removeEnd(stagingPath, File.separator); - return stagingPath + File.separator + getJobName(); + String srcStagingPath; + if ("NA".equalsIgnoreCase(stagingPath)) { + stagingPath = StringUtils.removeEnd(FileUtils.DEFAULT_EVENT_STORE_PATH, File.separator); + srcStagingPath = stagingPath + File.separator + getJobName(); } else { - return FileUtils.DEFAULT_EVENT_STORE_PATH + getJobName(); + stagingPath = StringUtils.removeEnd(stagingPath, File.separator); + srcStagingPath = stagingPath + File.separator + getJobName(); } + context.put(HiveDRArgs.SOURCE_STAGING_PATH, srcStagingPath); } public String getSourceWriteEP() { @@ -109,13 +117,20 @@ public class HiveDROptions { } public String getTargetStagingPath() { + return context.get(HiveDRArgs.TARGET_STAGING_PATH); + } + + public void setTargetStagingPath() { String stagingPath = context.get(HiveDRArgs.TARGET_STAGING_PATH); - if (StringUtils.isNotBlank(stagingPath)) { - stagingPath = StringUtils.removeEnd(stagingPath, File.separator); - return stagingPath + File.separator + getJobName(); + String targetStagingPath; + if ("NA".equalsIgnoreCase(stagingPath)) { + stagingPath = StringUtils.removeEnd(FileUtils.DEFAULT_EVENT_STORE_PATH, File.separator); + targetStagingPath = stagingPath + File.separator + getJobName(); } else { - return FileUtils.DEFAULT_EVENT_STORE_PATH + getJobName(); + stagingPath = StringUtils.removeEnd(stagingPath, File.separator); + targetStagingPath = stagingPath + File.separator + getJobName(); } + context.put(HiveDRArgs.TARGET_STAGING_PATH, targetStagingPath); } public String getReplicationMaxMaps() { @@ -151,7 +166,7 @@ public class HiveDROptions { } public static HiveDROptions create(String[] args) throws ParseException { - Map<HiveDRArgs, String> options = new HashMap<HiveDRArgs, String>(); + Map<HiveDRArgs, String> options = new HashMap<>(); CommandLine cmd = getCommand(args); for (HiveDRArgs arg : HiveDRArgs.values()) { http://git-wip-us.apache.org/repos/asf/falcon/blob/ee644dd2/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java index 17eec22..e45b0d8 100644 --- a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java +++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java @@ -136,6 +136,13 @@ public class HiveDRTool extends Configured implements Tool { inputOptions = parseOptions(args); LOG.info("Input Options: {}", inputOptions); + // Update the source staging path + inputOptions.setSourceStagingPath(); + inputOptions.setTargetStagingPath(); + + LOG.info("srcStaginPath: {}", inputOptions.getSourceStagingPath()); + LOG.info("tgtStaginPath: {}", inputOptions.getTargetStagingPath()); + Configuration sourceConf = FileUtils.getConfiguration(inputOptions.getSourceWriteEP(), inputOptions.getSourceNNKerberosPrincipal()); sourceClusterFS = FileSystem.get(sourceConf); http://git-wip-us.apache.org/repos/asf/falcon/blob/ee644dd2/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java index 08e0551..e2297ef 100644 --- a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java +++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java @@ -75,7 +75,7 @@ public class CopyMapper extends Mapper<LongWritable, Text, Text, Text> { // In case of export stage, populate custom counters if (context.getConfiguration().get(HiveDRArgs.EXECUTION_STAGE.getName()) .equalsIgnoreCase(HiveDRUtils.ExecutionStage.EXPORT.name()) - && !eventUtils.isCountersMapEmtpy()) { + && !eventUtils.isCountersMapEmpty()) { context.getCounter(ReplicationJobCountersList.BYTESCOPIED).increment( eventUtils.getCounterValue(ReplicationJobCountersList.BYTESCOPIED.getName())); context.getCounter(ReplicationJobCountersList.COPY).increment( http://git-wip-us.apache.org/repos/asf/falcon/blob/ee644dd2/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java index 3b088f7..590a7e3 100644 --- a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java +++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java @@ -37,7 +37,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedReader; -import java.io.File; import java.io.IOException; import java.io.InputStreamReader; import java.sql.Connection; @@ -95,17 +94,15 @@ public class EventUtils { sourceDatabase = conf.get(HiveDRArgs.SOURCE_DATABASE.getName()); sourceNN = conf.get(HiveDRArgs.SOURCE_NN.getName()); sourceNNKerberosPrincipal = conf.get(HiveDRArgs.SOURCE_NN_KERBEROS_PRINCIPAL.getName()); - sourceStagingPath = conf.get(HiveDRArgs.SOURCE_STAGING_PATH.getName()) - + File.separator + conf.get(HiveDRArgs.JOB_NAME.getName()); + sourceStagingPath = conf.get(HiveDRArgs.SOURCE_STAGING_PATH.getName()); jobNN = conf.get(HiveDRArgs.JOB_CLUSTER_NN.getName()); jobNNKerberosPrincipal = conf.get(HiveDRArgs.JOB_CLUSTER_NN_KERBEROS_PRINCIPAL.getName()); targetHiveServer2Uri = conf.get(HiveDRArgs.TARGET_HS2_URI.getName()); - targetStagingPath = conf.get(HiveDRArgs.TARGET_STAGING_PATH.getName()) - + File.separator + conf.get(HiveDRArgs.JOB_NAME.getName()); + targetStagingPath = conf.get(HiveDRArgs.TARGET_STAGING_PATH.getName()); targetNN = conf.get(HiveDRArgs.TARGET_NN.getName()); targetNNKerberosPrincipal = conf.get(HiveDRArgs.TARGET_NN_KERBEROS_PRINCIPAL.getName()); - sourceCleanUpList = new ArrayList<Path>(); - targetCleanUpList = new ArrayList<Path>(); + sourceCleanUpList = new ArrayList<>(); + targetCleanUpList = new ArrayList<>(); countersMap = new HashMap<>(); } @@ -169,7 +166,7 @@ public class EventUtils { } public void processEvents(String event) throws Exception { - listReplicationStatus = new ArrayList<ReplicationStatus>(); + listReplicationStatus = new ArrayList<>(); String[] eventSplit = event.split(DelimiterUtils.FIELD_DELIM); String dbName = new String(Base64.decodeBase64(eventSplit[0]), "UTF-8"); String tableName = new String(Base64.decodeBase64(eventSplit[1]), "UTF-8"); @@ -203,7 +200,7 @@ public class EventUtils { List<Path> cleanUpList, boolean isImportStatements) throws SQLException, HiveReplicationException, IOException { String[] commandList = eventStr.split(DelimiterUtils.NEWLINE_DELIM); - List<Command> deserializeCommand = new ArrayList<Command>(); + List<Command> deserializeCommand = new ArrayList<>(); for (String command : commandList) { Command cmd = ReplicationUtils.deserializeCommand(command); deserializeCommand.add(cmd); @@ -269,7 +266,7 @@ public class EventUtils { } private static List<Path> getCleanUpPaths(List<String> cleanupLocations) { - List<Path> cleanupLocationPaths = new ArrayList<Path>(); + List<Path> cleanupLocationPaths = new ArrayList<>(); for (String cleanupLocation : cleanupLocations) { cleanupLocationPaths.add(new Path(cleanupLocation)); } @@ -330,7 +327,7 @@ public class EventUtils { public DistCpOptions getDistCpOptions() { // DistCpOptions expects the first argument to be a file OR a list of Paths - List<Path> sourceUris=new ArrayList<Path>(); + List<Path> sourceUris=new ArrayList<>(); sourceUris.add(new Path(sourceStagingUri)); DistCpOptions distcpOptions = new DistCpOptions(sourceUris, new Path(targetStagingUri)); @@ -350,8 +347,8 @@ public class EventUtils { return countersMap.get(counterKey); } - public boolean isCountersMapEmtpy() { - return countersMap.size() == 0 ? true : false; + public boolean isCountersMapEmpty() { + return countersMap.size() == 0; } public void cleanEventsDirectory() throws IOException { http://git-wip-us.apache.org/repos/asf/falcon/blob/ee644dd2/addons/hivedr/src/main/java/org/apache/falcon/hive/util/FileUtils.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/FileUtils.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/FileUtils.java index 001d10a..ce80586 100644 --- a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/FileUtils.java +++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/FileUtils.java @@ -33,8 +33,8 @@ import java.io.IOException; */ public final class FileUtils { - public static final String DEFAULT_EVENT_STORE_PATH = DRStatusStore.BASE_DEFAULT_STORE_PATH - + File.separator + "Events"; + public static final String DEFAULT_EVENT_STORE_PATH = StringUtils.removeEnd(DRStatusStore + .BASE_DEFAULT_STORE_PATH, File.separator) + File.separator + "Events"; public static final FsPermission FS_PERMISSION_700 = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE); http://git-wip-us.apache.org/repos/asf/falcon/blob/ee644dd2/addons/hivedr/src/test/java/org/apache/falcon/hive/DRTest.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/test/java/org/apache/falcon/hive/DRTest.java b/addons/hivedr/src/test/java/org/apache/falcon/hive/DRTest.java index 1f44b62..a9c5661 100644 --- a/addons/hivedr/src/test/java/org/apache/falcon/hive/DRTest.java +++ b/addons/hivedr/src/test/java/org/apache/falcon/hive/DRTest.java @@ -25,8 +25,8 @@ public class DRTest { public void testHiveDr(String[] args) { String[] testArgs = { "-sourceMetastoreUri", "thrift://localhost:9083", - "-sourceDatabase", "default", - "-sourceTable", "test", + "-sourceDatabases", "default", + "-sourceTables", "test", "-sourceStagingPath", "/apps/hive/tools/dr", "-sourceNN", "hdfs://localhost:8020", "-sourceRM", "local", http://git-wip-us.apache.org/repos/asf/falcon/blob/ee644dd2/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtension.java ---------------------------------------------------------------------- diff --git a/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtension.java b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtension.java index 949aea5..75759df 100644 --- a/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtension.java +++ b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtension.java @@ -39,6 +39,7 @@ public class HiveMirroringExtension extends AbstractExtension { private static final String ALL_TABLES = "*"; private static final String COMMA_DELIMITER = ","; private static final String SECURE_RESOURCE = "-secure"; + private static final String NOT_APPLICABLE = "NA"; @Override public String getName() { @@ -122,6 +123,12 @@ public class HiveMirroringExtension extends AbstractExtension { additionalProperties.put(HiveMirroringExtensionProperties.HIVE_MIRRORING_JOB_NAME.getName(), jobName); + // Get the first source DB + additionalProperties.put(HiveMirroringExtensionProperties.SOURCE_DATABASE.getName(), + extensionProperties.getProperty(HiveMirroringExtensionProperties.SOURCE_DATABASES + .getName()).trim().split(",")[0] + ); + String clusterName = extensionProperties.getProperty(ExtensionProperties.CLUSTER_NAME.getName()); // Add required properties of cluster where job should run additionalProperties.put(HiveMirroringExtensionProperties.CLUSTER_FOR_JOB_RUN.getName(), @@ -230,6 +237,16 @@ public class HiveMirroringExtension extends AbstractExtension { additionalProperties.put(HiveMirroringExtensionProperties.TDE_ENCRYPTION_ENABLED.getName(), "false"); } + if (StringUtils.isBlank( + extensionProperties.getProperty(HiveMirroringExtensionProperties.SOURCE_STAGING_PATH.getName()))) { + additionalProperties.put(HiveMirroringExtensionProperties.SOURCE_STAGING_PATH.getName(), NOT_APPLICABLE); + } + + if (StringUtils.isBlank( + extensionProperties.getProperty(HiveMirroringExtensionProperties.TARGET_STAGING_PATH.getName()))) { + additionalProperties.put(HiveMirroringExtensionProperties.TARGET_STAGING_PATH.getName(), NOT_APPLICABLE); + } + return additionalProperties; } } http://git-wip-us.apache.org/repos/asf/falcon/blob/ee644dd2/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtensionProperties.java ---------------------------------------------------------------------- diff --git a/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtensionProperties.java b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtensionProperties.java index 6c4f58d..7e80712 100644 --- a/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtensionProperties.java +++ b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtensionProperties.java @@ -27,6 +27,7 @@ public enum HiveMirroringExtensionProperties { SOURCE_METASTORE_URI("sourceMetastoreUri", "Source Hive metastore uri", false), SOURCE_HS2_URI("sourceHiveServer2Uri", "Source HS2 uri"), SOURCE_DATABASES("sourceDatabases", "List of databases to replicate"), + SOURCE_DATABASE("sourceDatabase", "Database to verify the setup connection", false), SOURCE_TABLES("sourceTables", "List of tables to replicate", false), SOURCE_STAGING_PATH("sourceStagingPath", "Location of source staging path", false), SOURCE_NN("sourceNN", "Source name node", false), @@ -50,13 +51,13 @@ public enum HiveMirroringExtensionProperties { MAX_EVENTS("maxEvents", "Maximum events to replicate", false), MAX_MAPS("replicationMaxMaps", "Maximum number of maps used during replication", false), DISTCP_MAX_MAPS("distcpMaxMaps", "Maximum number of maps used during distcp", false), - MAP_BANDWIDTH_IN_MB("distcpMapBandwidth", "Bandwidth in MB/s used by each mapper during replication"), + MAP_BANDWIDTH_IN_MB("distcpMapBandwidth", "Bandwidth in MB/s used by each mapper during replication", false), CLUSTER_FOR_JOB_RUN("clusterForJobRun", "Cluster on which replication job runs", false), CLUSTER_FOR_JOB_NN_KERBEROS_PRINCIPAL("Job cluster kerberos principal", "Write EP of cluster on which replication job runs", false), CLUSTER_FOR_JOB_RUN_WRITE_EP("clusterForJobRunWriteEP", "Write EP of cluster on which replication job runs", false), TDE_ENCRYPTION_ENABLED("tdeEncryptionEnabled", "Set to true if TDE encryption is enabled", false), - HIVE_MIRRORING_JOB_NAME("jobName", "Unique hive replication job name", false); + HIVE_MIRRORING_JOB_NAME("hiveJobName", "Unique hive replication job name", false); private final String name; private final String description;
