http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-workflow.xml ---------------------------------------------------------------------- diff --git a/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-workflow.xml b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-workflow.xml new file mode 100644 index 0000000..c441998 --- /dev/null +++ b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-workflow.xml @@ -0,0 +1,293 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> +<workflow-app xmlns='uri:oozie:workflow:0.3' name='falcon-dr-hive-workflow'> + <start to='last-event'/> + <action name="last-event"> + <java> + <job-tracker>${jobTracker}</job-tracker> + <name-node>${nameNode}</name-node> + <configuration> + <property> <!-- hadoop 2 parameter --> + <name>oozie.launcher.mapreduce.job.user.classpath.first</name> + <value>true</value> + </property> + <property> + <name>mapred.job.queue.name</name> + <value>${queueName}</value> + </property> + <property> + <name>oozie.launcher.mapred.job.priority</name> + <value>${jobPriority}</value> + </property> + <property> + <name>oozie.use.system.libpath</name> + <value>true</value> + </property> + <property> + <name>oozie.action.sharelib.for.java</name> + <value>distcp,hive,hive2,hcatalog</value> + </property> + </configuration> + <main-class>org.apache.falcon.hive.HiveDRTool</main-class> + <arg>-Dmapred.job.queue.name=${queueName}</arg> + <arg>-Dmapred.job.priority=${jobPriority}</arg> + <arg>-falconLibPath</arg> + <arg>${wf:conf("falcon.libpath")}</arg> + <arg>-sourceCluster</arg> + <arg>${sourceCluster}</arg> + <arg>-sourceMetastoreUri</arg> + <arg>${sourceMetastoreUri}</arg> + <arg>-sourceHiveServer2Uri</arg> + <arg>${sourceHiveServer2Uri}</arg> + <arg>-sourceDatabase</arg> + <arg>${sourceDatabase}</arg> + <arg>-sourceTable</arg> + <arg>${sourceTable}</arg> + <arg>-sourceStagingPath</arg> + <arg>${sourceStagingPath}</arg> + <arg>-sourceNN</arg> + <arg>${sourceNN}</arg> + <arg>-targetCluster</arg> + <arg>${targetCluster}</arg> + <arg>-targetMetastoreUri</arg> + <arg>${targetMetastoreUri}</arg> + <arg>-targetHiveServer2Uri</arg> + <arg>${targetHiveServer2Uri}</arg> + <arg>-targetStagingPath</arg> + <arg>${targetStagingPath}</arg> + <arg>-targetNN</arg> + <arg>${targetNN}</arg> + <arg>-maxEvents</arg> + <arg>${maxEvents}</arg> + <arg>-clusterForJobRun</arg> + <arg>${clusterForJobRun}</arg> + <arg>-clusterForJobRunWriteEP</arg> + <arg>${clusterForJobRunWriteEP}</arg> + <arg>-drJobName</arg> + <arg>${drJobName}-${nominalTime}</arg> + <arg>-executionStage</arg> + <arg>lastevents</arg> + </java> + <ok to="export-dr-replication"/> + <error to="failure"/> + </action> + <!-- Export Replication action --> + <action name="export-dr-replication"> + <java> + <job-tracker>${jobTracker}</job-tracker> + <name-node>${nameNode}</name-node> + <configuration> + <property> <!-- hadoop 2 parameter --> + <name>oozie.launcher.mapreduce.job.user.classpath.first</name> + <value>true</value> + </property> + <property> + <name>mapred.job.queue.name</name> + <value>${queueName}</value> + </property> + <property> + <name>oozie.launcher.mapred.job.priority</name> + <value>${jobPriority}</value> + </property> + <property> + <name>oozie.use.system.libpath</name> + <value>true</value> + </property> + <property> + <name>oozie.action.sharelib.for.java</name> + <value>distcp,hive,hive2,hcatalog</value> + </property> + </configuration> + <main-class>org.apache.falcon.hive.HiveDRTool</main-class> + <arg>-Dmapred.job.queue.name=${queueName}</arg> + <arg>-Dmapred.job.priority=${jobPriority}</arg> + <arg>-falconLibPath</arg> + <arg>${wf:conf("falcon.libpath")}</arg> + <arg>-replicationMaxMaps</arg> + <arg>${replicationMaxMaps}</arg> + <arg>-distcpMaxMaps</arg> + <arg>${distcpMaxMaps}</arg> + <arg>-sourceCluster</arg> + <arg>${sourceCluster}</arg> + <arg>-sourceMetastoreUri</arg> + <arg>${sourceMetastoreUri}</arg> + <arg>-sourceHiveServer2Uri</arg> + <arg>${sourceHiveServer2Uri}</arg> + <arg>-sourceDatabase</arg> + <arg>${sourceDatabase}</arg> + <arg>-sourceTable</arg> + <arg>${sourceTable}</arg> + <arg>-sourceStagingPath</arg> + <arg>${sourceStagingPath}</arg> + <arg>-sourceNN</arg> + <arg>${sourceNN}</arg> + <arg>-targetCluster</arg> + <arg>${targetCluster}</arg> + <arg>-targetMetastoreUri</arg> + <arg>${targetMetastoreUri}</arg> + <arg>-targetHiveServer2Uri</arg> + <arg>${targetHiveServer2Uri}</arg> + <arg>-targetStagingPath</arg> + <arg>${targetStagingPath}</arg> + <arg>-targetNN</arg> + <arg>${targetNN}</arg> + <arg>-maxEvents</arg> + <arg>${maxEvents}</arg> + <arg>-distcpMapBandwidth</arg> + <arg>${distcpMapBandwidth}</arg> + <arg>-clusterForJobRun</arg> + <arg>${clusterForJobRun}</arg> + <arg>-clusterForJobRunWriteEP</arg> + <arg>${clusterForJobRunWriteEP}</arg> + <arg>-drJobName</arg> + <arg>${drJobName}-${nominalTime}</arg> + <arg>-executionStage</arg> + <arg>export</arg> + </java> + <ok to="import-dr-replication"/> + <error to="failure"/> + </action> + <!-- Import Replication action --> + <action name="import-dr-replication"> + <java> + <job-tracker>${jobTracker}</job-tracker> + <name-node>${nameNode}</name-node> + <configuration> + <property> <!-- hadoop 2 parameter --> + <name>oozie.launcher.mapreduce.job.user.classpath.first</name> + <value>true</value> + </property> + <property> + <name>mapred.job.queue.name</name> + <value>${queueName}</value> + </property> + <property> + <name>oozie.launcher.mapred.job.priority</name> + <value>${jobPriority}</value> + </property> + <property> + <name>oozie.use.system.libpath</name> + <value>true</value> + </property> + <property> + <name>oozie.action.sharelib.for.java</name> + <value>distcp,hive,hive2,hcatalog</value> + </property> + </configuration> + <main-class>org.apache.falcon.hive.HiveDRTool</main-class> + <arg>-Dmapred.job.queue.name=${queueName}</arg> + <arg>-Dmapred.job.priority=${jobPriority}</arg> + <arg>-falconLibPath</arg> + <arg>${wf:conf("falcon.libpath")}</arg> + <arg>-replicationMaxMaps</arg> + <arg>${replicationMaxMaps}</arg> + <arg>-distcpMaxMaps</arg> + <arg>${distcpMaxMaps}</arg> + <arg>-sourceCluster</arg> + <arg>${sourceCluster}</arg> + <arg>-sourceMetastoreUri</arg> + <arg>${sourceMetastoreUri}</arg> + <arg>-sourceHiveServer2Uri</arg> + <arg>${sourceHiveServer2Uri}</arg> + <arg>-sourceDatabase</arg> + <arg>${sourceDatabase}</arg> + <arg>-sourceTable</arg> + <arg>${sourceTable}</arg> + <arg>-sourceStagingPath</arg> + <arg>${sourceStagingPath}</arg> + <arg>-sourceNN</arg> + <arg>${sourceNN}</arg> + <arg>-targetCluster</arg> + <arg>${targetCluster}</arg> + <arg>-targetMetastoreUri</arg> + <arg>${targetMetastoreUri}</arg> + <arg>-targetHiveServer2Uri</arg> + <arg>${targetHiveServer2Uri}</arg> + <arg>-targetStagingPath</arg> + <arg>${targetStagingPath}</arg> + <arg>-targetNN</arg> + <arg>${targetNN}</arg> + <arg>-maxEvents</arg> + <arg>${maxEvents}</arg> + <arg>-distcpMapBandwidth</arg> + <arg>${distcpMapBandwidth}</arg> + <arg>-clusterForJobRun</arg> + <arg>${clusterForJobRun}</arg> + <arg>-clusterForJobRunWriteEP</arg> + <arg>${clusterForJobRunWriteEP}</arg> + <arg>-drJobName</arg> + <arg>${drJobName}-${nominalTime}</arg> + <arg>-executionStage</arg> + <arg>import</arg> + </java> + <ok to="success"/> + <error to="failure"/> + </action> + <decision name="success"> + <switch> + <case to="successAlert"> + ${drNotificationReceivers ne 'NA'} + </case> + <default to="end"/> + </switch> + </decision> + <decision name="failure"> + <switch> + <case to="failureAlert"> + ${drNotificationReceivers ne 'NA'} + </case> + <default to="fail"/> + </switch> + </decision> + <action name="successAlert"> + <email xmlns="uri:oozie:email-action:0.2"> + <to>${drNotificationReceivers}</to> + <subject>INFO: Hive DR workflow ${drJobName} completed successfully</subject> + <body> + The Hive DR workflow ${wf:id()} is successful. + Source = ${sourceCluster} + Target = ${targetCluster} + DB Name = ${sourceDatabase} + Table Name = ${sourceTable} + </body> + </email> + <ok to="end"/> + <error to="end"/> + </action> + <action name="failureAlert"> + <email xmlns="uri:oozie:email-action:0.2"> + <to>${drNotificationReceivers}</to> + <subject>ERROR: Hive DR workflow ${drJobName} failed</subject> + <body> + The Hive DR workflow ${wf:id()} had issues and was killed. The error message is: ${wf:errorMessage(wf:lastErrorNode())} + Source = ${sourceCluster} + Target = ${targetCluster} + DB Name = ${sourceDatabase} + Table Name = ${sourceTable} + </body> + </email> + <ok to="end"/> + <error to="fail"/> + </action> + <kill name="fail"> + <message> + Workflow action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + </message> + </kill> + <end name="end"/> +</workflow-app>
http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery.properties ---------------------------------------------------------------------- diff --git a/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery.properties b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery.properties new file mode 100644 index 0000000..42ae30b --- /dev/null +++ b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery.properties @@ -0,0 +1,94 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +##### NOTE: This is a TEMPLATE file which can be copied and edited + +##### Recipe properties +falcon.recipe.name=hive-disaster-recovery + + +##### Workflow properties +falcon.recipe.workflow.name=hive-dr-workflow +# Provide Wf absolute path. This can be HDFS or local FS path. If WF is on local FS it will be copied to HDFS +falcon.recipe.workflow.path=/recipes/hive-replication/hive-disaster-recovery-workflow.xml + +##### Cluster properties + +# Change the cluster name where replication job should run here +falcon.recipe.cluster.name=backupCluster +# Change the cluster hdfs write end point here. This is mandatory. +falcon.recipe.cluster.hdfs.writeEndPoint=hdfs://localhost:8020 +# Change the cluster validity start time here +falcon.recipe.cluster.validity.start=2014-10-01T00:00Z +# Change the cluster validity end time here +falcon.recipe.cluster.validity.end=2016-12-30T00:00Z + +##### Scheduling properties + +# Change the process frequency here. Valid frequency type are minutes, hours, days, months +falcon.recipe.process.frequency=minutes(60) + +##### Retry policy properties + +falcon.recipe.retry.policy=periodic +falcon.recipe.retry.delay=minutes(30) +falcon.recipe.retry.attempts=3 + +##### Tag properties - An optional list of comma separated tags, Key Value Pairs, separated by comma +##### Uncomment to add tags +#falcon.recipe.tags=owner=landing,pipeline=adtech + +##### ACL properties - Uncomment and change ACL if authorization is enabled + +#falcon.recipe.acl.owner=testuser +#falcon.recipe.acl.group=group +#falcon.recipe.acl.permission=0x755 + +##### Custom Job properties + +##### Source Cluster DR properties +sourceCluster=primaryCluster +sourceMetastoreUri=thrift://localhost:9083 +sourceHiveServer2Uri=hive2://localhost:10000 +# For DB level replicaiton to replicate multiple databases specify comma separated list of tables +sourceDatabase=default +# For DB level replication specify * for sourceTable. +# For table level replication to replicate multiple tables specify comma separated list of tables +sourceTable=testtable_dr +sourceStagingPath=/apps/hive/tools/dr +sourceNN=hdfs://localhost:8020 + +##### Target Cluster DR properties +targetCluster=backupCluster +targetMetastoreUri=thrift://localhost:9083 +targetHiveServer2Uri=hive2://localhost:10000 +targetStagingPath=/apps/hive/tools/dr +targetNN=hdfs://localhost:8020 + +# To ceil the max events processed each time job runs. Set it to max value depending on your bandwidth limit. +# Setting it to -1 will process all the events but can hog up the bandwidth. Use it judiciously! +maxEvents=-1 +# Change it to specify the maximum number of mappers for replication +replicationMaxMaps=5 +# Change it to specify the maximum number of mappers for DistCP +distcpMaxMaps=1 +# Change it to specify the bandwidth in MB for each mapper in DistCP +distcpMapBandwidth=100 + +##### Email on failure +drNotificationReceivers=NA \ No newline at end of file http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/client/pom.xml ---------------------------------------------------------------------- diff --git a/client/pom.xml b/client/pom.xml index afa91c9..c162125 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -105,6 +105,12 @@ <groupId>org.testng</groupId> <artifactId>testng</artifactId> </dependency> + + <dependency> + <groupId>org.apache.hive.hcatalog</groupId> + <artifactId>hive-webhcat-java-client</artifactId> + <version>${hive.version}</version> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/client/src/main/java/org/apache/falcon/cli/FalconCLI.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java index 148f789..11f6bff 100644 --- a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java +++ b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java @@ -124,8 +124,17 @@ public class FalconCLI { // Recipe Command public static final String RECIPE_CMD = "recipe"; public static final String RECIPE_NAME = "name"; + public static final String RECIPE_OPERATION= "operation"; public static final String RECIPE_TOOL_CLASS_NAME = "tool"; + /** + * Recipe operation enum. + */ + public static enum RecipeOperation { + HDFS_REPLICATION, + HIVE_DISASTER_RECOVERY + } + private final Properties clientProperties; public FalconCLI() throws Exception { @@ -914,6 +923,9 @@ public class FalconCLI { Option recipeToolClassName = new Option(RECIPE_TOOL_CLASS_NAME, true, "recipe class"); recipeOptions.addOption(recipeToolClassName); + Option recipeOperation = new Option(RECIPE_OPERATION, true, "recipe operation"); + recipeOptions.addOption(recipeOperation); + return recipeOptions; } @@ -1005,11 +1017,23 @@ public class FalconCLI { private void recipeCommand(CommandLine commandLine, FalconClient client) throws FalconCLIException { String recipeName = commandLine.getOptionValue(RECIPE_NAME); String recipeToolClass = commandLine.getOptionValue(RECIPE_TOOL_CLASS_NAME); + String recipeOperation = commandLine.getOptionValue(RECIPE_OPERATION); validateNotEmpty(recipeName, RECIPE_NAME); + validateNotEmpty(recipeOperation, RECIPE_OPERATION); + validateRecipeOperations(recipeOperation); - String result = - client.submitRecipe(recipeName, recipeToolClass).getMessage(); + String result = client.submitRecipe(recipeName, recipeToolClass, recipeOperation).toString(); OUT.get().println(result); } + + private static void validateRecipeOperations(String recipeOperation) throws FalconCLIException { + for(RecipeOperation operation : RecipeOperation.values()) { + if (operation.toString().equalsIgnoreCase(recipeOperation)) { + return; + } + } + throw new FalconCLIException("Allowed Recipe operations: " + + java.util.Arrays.asList((RecipeOperation.values()))); + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/client/src/main/java/org/apache/falcon/client/FalconClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java index 9649e10..d9bdf64 100644 --- a/client/src/main/java/org/apache/falcon/client/FalconClient.java +++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java @@ -963,7 +963,8 @@ public class FalconClient extends AbstractFalconClient { } public APIResult submitRecipe(String recipeName, - String recipeToolClassName) throws FalconCLIException { + String recipeToolClassName, + final String recipeOperation) throws FalconCLIException { String recipePath = clientProperties.getProperty("falcon.recipe.path"); if (StringUtils.isEmpty(recipePath)) { @@ -999,6 +1000,7 @@ public class FalconClient extends AbstractFalconClient { "-" + RecipeToolArgs.RECIPE_FILE_ARG.getName(), recipeFilePath, "-" + RecipeToolArgs.RECIPE_PROPERTIES_FILE_ARG.getName(), propertiesFilePath, "-" + RecipeToolArgs.RECIPE_PROCESS_XML_FILE_PATH_ARG.getName(), processFile, + "-" + RecipeToolArgs.RECIPE_OPERATION_ARG.getName(), recipeOperation, }; if (recipeToolClassName != null) { http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/client/src/main/java/org/apache/falcon/recipe/HdfsReplicationRecipeTool.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/recipe/HdfsReplicationRecipeTool.java b/client/src/main/java/org/apache/falcon/recipe/HdfsReplicationRecipeTool.java new file mode 100644 index 0000000..cf24078 --- /dev/null +++ b/client/src/main/java/org/apache/falcon/recipe/HdfsReplicationRecipeTool.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.recipe; + +import org.apache.commons.lang3.StringUtils; + +import java.util.Properties; +import java.io.File; + +/** + * Hdfs Replication recipe tool for Falcon recipes. + */ +public class HdfsReplicationRecipeTool implements Recipe { + + private static final String COMMA_SEPARATOR = ","; + + @Override + public void validate(final Properties recipeProperties) { + for (HdfsReplicationRecipeToolOptions option : HdfsReplicationRecipeToolOptions.values()) { + if (recipeProperties.getProperty(option.getName()) == null && option.isRequired()) { + throw new IllegalArgumentException("Missing argument: " + option.getName()); + } + } + } + + @Override + public Properties getAdditionalSystemProperties(final Properties recipeProperties) { + Properties additionalProperties = new Properties(); + + // Construct fully qualified hdfs src path + String srcPaths = recipeProperties.getProperty(HdfsReplicationRecipeToolOptions + .REPLICATION_SOURCE_DIR.getName()); + StringBuilder absoluteSrcPaths = new StringBuilder(); + String srcFsPath = recipeProperties.getProperty( + HdfsReplicationRecipeToolOptions.REPLICATION_SOURCE_CLUSTER_FS_WRITE_ENDPOINT.getName()); + if (StringUtils.isNotEmpty(srcFsPath)) { + srcFsPath = StringUtils.removeEnd(srcFsPath, File.separator); + } + if (StringUtils.isNotEmpty(srcPaths)) { + String[] paths = srcPaths.split(COMMA_SEPARATOR); + + for (String path : paths) { + StringBuilder srcpath = new StringBuilder(srcFsPath); + srcpath.append(path.trim()); + srcpath.append(COMMA_SEPARATOR); + absoluteSrcPaths.append(srcpath); + } + } + + additionalProperties.put(HdfsReplicationRecipeToolOptions.REPLICATION_SOURCE_DIR.getName(), + StringUtils.removeEnd(absoluteSrcPaths.toString(), COMMA_SEPARATOR)); + return additionalProperties; + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/client/src/main/java/org/apache/falcon/recipe/HdfsReplicationRecipeToolOptions.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/recipe/HdfsReplicationRecipeToolOptions.java b/client/src/main/java/org/apache/falcon/recipe/HdfsReplicationRecipeToolOptions.java new file mode 100644 index 0000000..4c3b543 --- /dev/null +++ b/client/src/main/java/org/apache/falcon/recipe/HdfsReplicationRecipeToolOptions.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.recipe; + +/** + * Hdfs Recipe tool options. + */ +public enum HdfsReplicationRecipeToolOptions { + REPLICATION_SOURCE_DIR("drSourceDir", "Location of source data to replicate"), + REPLICATION_SOURCE_CLUSTER_FS_WRITE_ENDPOINT("drSourceClusterFS", "Source replication cluster end point"), + REPLICATION_TARGET_DIR("drTargetDir", "Location on target cluster for replication"), + REPLICATION_TARGET_CLUSTER_FS_WRITE_ENDPOINT("drTargetClusterFS", "Target replication cluster end point"), + REPLICATION_MAX_MAPS("distcpMaxMaps", "Maximum number of maps used during replication"), + REPLICATION_MAP_BANDWIDTH_IN_MB("distcpMapBandwidth", "Bandwidth in MB/s used by each mapper during replication"); + + private final String name; + private final String description; + private final boolean isRequired; + + HdfsReplicationRecipeToolOptions(String name, String description) { + this(name, description, true); + } + + HdfsReplicationRecipeToolOptions(String name, String description, boolean isRequired) { + this.name = name; + this.description = description; + this.isRequired = isRequired; + } + + public String getName() { + return this.name; + } + + public String getDescription() { + return description; + } + + public boolean isRequired() { + return isRequired; + } + + @Override + public String toString() { + return getName(); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/client/src/main/java/org/apache/falcon/recipe/HiveReplicationRecipeTool.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/recipe/HiveReplicationRecipeTool.java b/client/src/main/java/org/apache/falcon/recipe/HiveReplicationRecipeTool.java new file mode 100644 index 0000000..8b39673 --- /dev/null +++ b/client/src/main/java/org/apache/falcon/recipe/HiveReplicationRecipeTool.java @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.recipe; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hive.hcatalog.api.HCatClient; +import org.apache.hive.hcatalog.api.HCatDatabase; +import org.apache.hive.hcatalog.api.HCatTable; +import org.apache.hive.hcatalog.api.ObjectNotFoundException; +import org.apache.hive.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer; +import org.apache.hive.hcatalog.common.HCatException; + +import java.io.IOException; +import java.util.Properties; + +/** + * Hive Replication recipe tool for Falcon recipes. + */ +public class HiveReplicationRecipeTool implements Recipe { + private static final String ALL_TABLES = "*"; + + @Override + public void validate(final Properties recipeProperties) throws Exception { + for (HiveReplicationRecipeToolOptions option : HiveReplicationRecipeToolOptions.values()) { + if (recipeProperties.getProperty(option.getName()) == null && option.isRequired()) { + throw new IllegalArgumentException("Missing argument: " + option.getName()); + } + } + + HCatClient sourceMetastoreClient = null; + HCatClient targetMetastoreClient = null; + try { + // Validate if DB exists - source and target + sourceMetastoreClient = getHiveMetaStoreClient( + recipeProperties.getProperty(HiveReplicationRecipeToolOptions + .REPLICATION_SOURCE_METASTORE_URI.getName()), + recipeProperties.getProperty(HiveReplicationRecipeToolOptions + .REPLICATION_SOURCE_HIVE_METASTORE_KERBEROS_PRINCIPAL.getName()), + recipeProperties.getProperty(HiveReplicationRecipeToolOptions + .REPLICATION_SOURCE_HIVE2_KERBEROS_PRINCIPAL.getName())); + + String sourceDbList = recipeProperties.getProperty( + HiveReplicationRecipeToolOptions.REPLICATION_SOURCE_DATABASE.getName()); + + if (StringUtils.isEmpty(sourceDbList)) { + throw new Exception("No source DB specified in property file"); + } + + String sourceTableList = recipeProperties.getProperty( + HiveReplicationRecipeToolOptions.REPLICATION_SOURCE_TABLE.getName()); + if (StringUtils.isEmpty(sourceTableList)) { + throw new Exception("No source table specified in property file. For DB replication please specify * " + + "for sourceTable"); + } + + String[] srcDbs = sourceDbList.split(","); + if (srcDbs.length <= 0) { + throw new Exception("No source DB specified in property file"); + } + for (String db : srcDbs) { + if (!dbExists(sourceMetastoreClient, db)) { + throw new Exception("Database " + db + " doesn't exist on source cluster"); + } + } + + if (!sourceTableList.equals(ALL_TABLES)) { + String[] srcTables = sourceTableList.split(","); + if (srcTables.length > 0) { + for (String table : srcTables) { + if (!tableExists(sourceMetastoreClient, srcDbs[0], table)) { + throw new Exception("Table " + table + " doesn't exist on source cluster"); + } + } + } + } + + targetMetastoreClient = getHiveMetaStoreClient( + recipeProperties.getProperty(HiveReplicationRecipeToolOptions + .REPLICATION_TARGET_METASTORE_URI.getName()), + recipeProperties.getProperty(HiveReplicationRecipeToolOptions + .REPLICATION_TARGET_HIVE_METASTORE_KERBEROS_PRINCIPAL.getName()), + recipeProperties.getProperty(HiveReplicationRecipeToolOptions + .REPLICATION_TARGET_HIVE2_KERBEROS_PRINCIPAL.getName())); + // Verify db exists on target + for (String db : srcDbs) { + if (!dbExists(targetMetastoreClient, db)) { + throw new Exception("Database " + db + " doesn't exist on target cluster"); + } + } + } finally { + if (sourceMetastoreClient != null) { + sourceMetastoreClient.close(); + } + if (targetMetastoreClient != null) { + targetMetastoreClient.close(); + } + } + } + + @Override + public Properties getAdditionalSystemProperties(final Properties recipeProperties) { + Properties additionalProperties = new Properties(); + String recipeName = recipeProperties.getProperty(RecipeToolOptions.RECIPE_NAME.getName()); + // Add recipe name as Hive DR job + additionalProperties.put(HiveReplicationRecipeToolOptions.HIVE_DR_JOB_NAME.getName(), recipeName); + additionalProperties.put(HiveReplicationRecipeToolOptions.CLUSTER_FOR_JOB_RUN.getName(), + recipeProperties.getProperty(RecipeToolOptions.CLUSTER_NAME.getName())); + additionalProperties.put(HiveReplicationRecipeToolOptions.CLUSTER_FOR_JOB_RUN_WRITE_EP.getName(), + recipeProperties.getProperty(RecipeToolOptions.CLUSTER_HDFS_WRITE_ENDPOINT.getName())); + if (StringUtils.isNotEmpty(recipeProperties.getProperty(RecipeToolOptions.RECIPE_NN_PRINCIPAL.getName()))) { + additionalProperties.put(HiveReplicationRecipeToolOptions.CLUSTER_FOR_JOB_NN_KERBEROS_PRINCIPAL.getName(), + recipeProperties.getProperty(RecipeToolOptions.RECIPE_NN_PRINCIPAL.getName())); + } + return additionalProperties; + } + + private HCatClient getHiveMetaStoreClient(String metastoreUrl, String metastorePrincipal, + String hive2Principal) throws Exception { + try { + HiveConf hcatConf = createHiveConf(new Configuration(false), metastoreUrl, + metastorePrincipal, hive2Principal); + return HCatClient.create(hcatConf); + } catch (IOException e) { + throw new Exception("Exception creating HCatClient: " + e.getMessage(), e); + } + } + + private static HiveConf createHiveConf(Configuration conf, String metastoreUrl, String metastorePrincipal, + String hive2Principal) throws IOException { + HiveConf hcatConf = new HiveConf(conf, HiveConf.class); + + hcatConf.setVar(HiveConf.ConfVars.METASTOREURIS, metastoreUrl); + hcatConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3); + hcatConf.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname, + HCatSemanticAnalyzer.class.getName()); + hcatConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); + + hcatConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); + hcatConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); + if (StringUtils.isNotEmpty(metastorePrincipal)) { + hcatConf.set(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname, metastorePrincipal); + hcatConf.set(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname, "true"); + hcatConf.set(HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI.varname, "true"); + } + if (StringUtils.isNotEmpty(hive2Principal)) { + hcatConf.set(HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL.varname, hive2Principal); + hcatConf.set(HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION.varname, "kerberos"); + } + + return hcatConf; + } + + private static boolean tableExists(HCatClient client, final String database, final String tableName) + throws Exception { + try { + HCatTable table = client.getTable(database, tableName); + return table != null; + } catch (ObjectNotFoundException e) { + System.out.println(e.getMessage()); + return false; + } catch (HCatException e) { + throw new Exception("Exception checking if the table exists:" + e.getMessage(), e); + } + } + + private static boolean dbExists(HCatClient client, final String database) + throws Exception { + try { + HCatDatabase db = client.getDatabase(database); + return db != null; + } catch (ObjectNotFoundException e) { + System.out.println(e.getMessage()); + return false; + } catch (HCatException e) { + throw new Exception("Exception checking if the db exists:" + e.getMessage(), e); + } + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/client/src/main/java/org/apache/falcon/recipe/HiveReplicationRecipeToolOptions.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/recipe/HiveReplicationRecipeToolOptions.java b/client/src/main/java/org/apache/falcon/recipe/HiveReplicationRecipeToolOptions.java new file mode 100644 index 0000000..ec0465d --- /dev/null +++ b/client/src/main/java/org/apache/falcon/recipe/HiveReplicationRecipeToolOptions.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.recipe; + +/** + * Hive Recipe tool options. + */ +public enum HiveReplicationRecipeToolOptions { + REPLICATION_SOURCE_CLUSTER("sourceCluster", "Replication source cluster name"), + REPLICATION_SOURCE_METASTORE_URI("sourceMetastoreUri", "Source Hive metastore uri"), + REPLICATION_SOURCE_HS2_URI("sourceHiveServer2Uri", "Source HS2 uri"), + REPLICATION_SOURCE_DATABASE("sourceDatabase", "List of databases to replicate"), + REPLICATION_SOURCE_TABLE("sourceTable", "List of tables to replicate"), + REPLICATION_SOURCE_STAGING_PATH("sourceStagingPath", "Location of source staging path"), + REPLICATION_SOURCE_NN("sourceNN", "Source name node"), + REPLICATION_SOURCE_NN_KERBEROS_PRINCIPAL("sourceNNKerberosPrincipal", "Source name node kerberos principal", false), + REPLICATION_SOURCE_HIVE_METASTORE_KERBEROS_PRINCIPAL("sourceHiveMetastoreKerberosPrincipal", + "Source hive metastore kerberos principal", false), + REPLICATION_SOURCE_HIVE2_KERBEROS_PRINCIPAL("sourceHive2KerberosPrincipal", + "Source hiveserver2 kerberos principal", false), + + REPLICATION_TARGET_CLUSTER("targetCluster", "Replication target cluster name"), + REPLICATION_TARGET_METASTORE_URI("targetMetastoreUri", "Target Hive metastore uri"), + REPLICATION_TARGET_HS2_URI("targetHiveServer2Uri", "Target HS2 uri"), + REPLICATION_TARGET_STAGING_PATH("targetStagingPath", "Location of target staging path"), + REPLICATION_TARGET_NN("targetNN", "Target name node"), + REPLICATION_TARGET_NN_KERBEROS_PRINCIPAL("targetNNKerberosPrincipal", "Target name node kerberos principal", false), + REPLICATION_TARGET_HIVE_METASTORE_KERBEROS_PRINCIPAL("targetHiveMetastoreKerberosPrincipal", + "Target hive metastore kerberos principal", false), + REPLICATION_TARGET_HIVE2_KERBEROS_PRINCIPAL("targetHive2KerberosPrincipal", + "Target hiveserver2 kerberos principal", false), + + REPLICATION_MAX_EVENTS("maxEvents", "Maximum events to replicate"), + REPLICATION_MAX_MAPS("replicationMaxMaps", "Maximum number of maps used during replication"), + DISTCP_MAX_MAPS("distcpMaxMaps", "Maximum number of maps used during distcp"), + REPLICATION_MAP_BANDWIDTH_IN_MB("distcpMapBandwidth", "Bandwidth in MB/s used by each mapper during replication"), + CLUSTER_FOR_JOB_RUN("clusterForJobRun", "Cluster on which replication job runs", false), + CLUSTER_FOR_JOB_NN_KERBEROS_PRINCIPAL("clusterForJobNNKerberosPrincipal", + "Write EP of cluster on which replication job runs", false), + CLUSTER_FOR_JOB_RUN_WRITE_EP("clusterForJobRunWriteEP", "Write EP of cluster on which replication job runs", false), + HIVE_DR_JOB_NAME("drJobName", "Unique hive DR job name", false); + + private final String name; + private final String description; + private final boolean isRequired; + + HiveReplicationRecipeToolOptions(String name, String description) { + this(name, description, true); + } + + HiveReplicationRecipeToolOptions(String name, String description, boolean isRequired) { + this.name = name; + this.description = description; + this.isRequired = isRequired; + } + + public String getName() { + return this.name; + } + + public String getDescription() { + return description; + } + + public boolean isRequired() { + return isRequired; + } + + @Override + public String toString() { + return getName(); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/client/src/main/java/org/apache/falcon/recipe/Recipe.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/recipe/Recipe.java b/client/src/main/java/org/apache/falcon/recipe/Recipe.java new file mode 100644 index 0000000..609131d --- /dev/null +++ b/client/src/main/java/org/apache/falcon/recipe/Recipe.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.recipe; + +import java.util.Properties; + +/** + * Recipe interface. + */ +public interface Recipe { + void validate(final Properties recipeProperties) throws Exception; + Properties getAdditionalSystemProperties(final Properties recipeProperties); +} http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/client/src/main/java/org/apache/falcon/recipe/RecipeFactory.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/recipe/RecipeFactory.java b/client/src/main/java/org/apache/falcon/recipe/RecipeFactory.java new file mode 100644 index 0000000..32b0871 --- /dev/null +++ b/client/src/main/java/org/apache/falcon/recipe/RecipeFactory.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.recipe; + +import org.apache.falcon.cli.FalconCLI.RecipeOperation; + +/** + * Recipe factory. + */ +public final class RecipeFactory { + + private RecipeFactory() { + } + + public static Recipe getRecipeToolType(String recipeType) { + if (recipeType == null) { + return null; + } + + if (RecipeOperation.HDFS_REPLICATION.toString().equalsIgnoreCase(recipeType)) { + return new HdfsReplicationRecipeTool(); + } else if (RecipeOperation.HIVE_DISASTER_RECOVERY.toString().equalsIgnoreCase(recipeType)) { + return new HiveReplicationRecipeTool(); + } + return null; + } + +} http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/client/src/main/java/org/apache/falcon/recipe/RecipeTool.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/recipe/RecipeTool.java b/client/src/main/java/org/apache/falcon/recipe/RecipeTool.java index 069db9f..243ff4d 100644 --- a/client/src/main/java/org/apache/falcon/recipe/RecipeTool.java +++ b/client/src/main/java/org/apache/falcon/recipe/RecipeTool.java @@ -19,44 +19,44 @@ package org.apache.falcon.recipe; import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.ParseException; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; import org.apache.commons.io.FileUtils; import org.apache.commons.io.FilenameUtils; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.falcon.recipe.util.RecipeProcessBuilderUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; -import org.apache.commons.cli.Options; -import java.io.BufferedReader; import java.io.File; -import java.io.InputStream; -import java.io.IOException; import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.FileReader; -import java.io.OutputStream; -import java.util.Map; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.security.PrivilegedExceptionAction; import java.util.HashMap; +import java.util.Map; import java.util.Properties; -import java.util.regex.Matcher; -import java.util.regex.Pattern; /** * Base recipe tool for Falcon recipes. */ public class RecipeTool extends Configured implements Tool { private static final String HDFS_WF_PATH = "falcon" + File.separator + "recipes" + File.separator; - private static final String RECIPE_PREFIX = "falcon.recipe."; - private static final Pattern RECIPE_VAR_PATTERN = Pattern.compile("##[A-Za-z0-9_.]*##"); - - private FileSystem hdfsFileSystem; + private static final FsPermission FS_PERMISSION = + new FsPermission(FsAction.ALL, FsAction.READ, FsAction.NONE); + private static final String FS_DEFAULT_NAME_KEY = "fs.defaultFS"; + private static final String NN_PRINCIPAL = "dfs.namenode.kerberos.principal"; public static void main(String[] args) throws Exception { ToolRunner.run(new Configuration(), new RecipeTool(), args); @@ -64,25 +64,38 @@ public class RecipeTool extends Configured implements Tool { @Override public int run(String[] arguments) throws Exception { + Map<RecipeToolArgs, String> argMap = setupArgs(arguments); if (argMap == null || argMap.isEmpty()) { throw new Exception("Arguments passed to recipe is null"); } - + Configuration conf = getConf(); String recipePropertiesFilePath = argMap.get(RecipeToolArgs.RECIPE_PROPERTIES_FILE_ARG); Properties recipeProperties = loadProperties(recipePropertiesFilePath); validateProperties(recipeProperties); - FileSystem fs = getFileSystemForHdfs(recipeProperties); + String recipeOperation = argMap.get(RecipeToolArgs.RECIPE_OPERATION_ARG); + Recipe recipeType = RecipeFactory.getRecipeToolType(recipeOperation); + if (recipeType != null) { + recipeType.validate(recipeProperties); + Properties props = recipeType.getAdditionalSystemProperties(recipeProperties); + if (props != null && !props.isEmpty()) { + recipeProperties.putAll(props); + } + } + String processFilename; + + FileSystem fs = getFileSystemForHdfs(recipeProperties, conf); validateArtifacts(recipeProperties, fs); - String recipeName = FilenameUtils.getBaseName(recipePropertiesFilePath); + String recipeName = recipeProperties.getProperty(RecipeToolOptions.RECIPE_NAME.getName()); copyFilesToHdfsIfRequired(recipeProperties, fs, recipeName); - Map<String, String> overlayMap = getOverlay(recipeProperties); - String processFilename = overlayParametersOverTemplate(argMap.get(RecipeToolArgs.RECIPE_FILE_ARG), - argMap.get(RecipeToolArgs.RECIPE_PROCESS_XML_FILE_PATH_ARG), overlayMap); + processFilename = RecipeProcessBuilderUtils.createProcessFromTemplate(argMap.get(RecipeToolArgs + .RECIPE_FILE_ARG), recipeProperties, argMap.get(RecipeToolArgs.RECIPE_PROCESS_XML_FILE_PATH_ARG)); + + System.out.println("Generated process file to be scheduled: "); System.out.println(FileUtils.readFileToString(new File(processFilename))); @@ -98,7 +111,7 @@ public class RecipeTool extends Configured implements Tool { addOption(options, arg, arg.isRequired()); } - CommandLine cmd = new GnuParser().parse(options, arguments); + CommandLine cmd = new GnuParser().parse(options, arguments); for (RecipeToolArgs arg : RecipeToolArgs.values()) { String optionValue = arg.getOptionValue(cmd); if (StringUtils.isNotEmpty(optionValue)) { @@ -135,7 +148,7 @@ public class RecipeTool extends Configured implements Tool { } } - private static void validateArtifacts(final Properties recipeProperties, final FileSystem fs) throws Exception{ + private static void validateArtifacts(final Properties recipeProperties, final FileSystem fs) throws Exception { // validate the WF path String wfPath = recipeProperties.getProperty(RecipeToolOptions.WORKFLOW_PATH.getName()); @@ -156,53 +169,6 @@ public class RecipeTool extends Configured implements Tool { } } - private static Map<String, String> getOverlay(final Properties recipeProperties) { - Map<String, String> overlay = new HashMap<String, String>(); - for (Map.Entry<Object, Object> entry : recipeProperties.entrySet()) { - String key = StringUtils.removeStart((String) entry.getKey(), RECIPE_PREFIX); - overlay.put(key, (String) entry.getValue()); - } - - return overlay; - } - - private static String overlayParametersOverTemplate(final String templateFile, - final String outFilename, - Map<String, String> overlay) throws Exception { - if (templateFile == null || outFilename == null || overlay == null || overlay.isEmpty()) { - throw new IllegalArgumentException("Invalid arguments passed"); - } - - String line; - OutputStream out = null; - BufferedReader reader = null; - - try { - out = new FileOutputStream(outFilename); - - reader = new BufferedReader(new FileReader(templateFile)); - while ((line = reader.readLine()) != null) { - Matcher matcher = RECIPE_VAR_PATTERN.matcher(line); - while (matcher.find()) { - String variable = line.substring(matcher.start(), matcher.end()); - String paramString = overlay.get(variable.substring(2, variable.length() - 2)); - if (paramString == null) { - throw new Exception("Match not found for the template: " + variable - + ". Please add it in recipe properties file"); - } - line = line.replace(variable, paramString); - matcher = RECIPE_VAR_PATTERN.matcher(line); - } - out.write(line.getBytes()); - out.write("\n".getBytes()); - } - } finally { - IOUtils.closeQuietly(reader); - IOUtils.closeQuietly(out); - } - return outFilename; - } - private static void copyFilesToHdfsIfRequired(final Properties recipeProperties, final FileSystem fs, final String recipeName) throws Exception { @@ -262,7 +228,7 @@ public class RecipeTool extends Configured implements Tool { private static void createDirOnHdfs(String path, FileSystem fs) throws IOException { Path hdfsPath = new Path(path); if (!fs.exists(hdfsPath)) { - fs.mkdirs(hdfsPath); + FileSystem.mkdirs(fs, hdfsPath, FS_PERMISSION); } } @@ -287,19 +253,33 @@ public class RecipeTool extends Configured implements Tool { fs.copyFromLocalFile(false, true, new Path(localFilePath), new Path(hdfsFilePath)); } - private static Configuration getConfiguration(final String storageEndpoint) throws Exception { - Configuration conf = new Configuration(); - conf.set("fs.defaultFS", storageEndpoint); - return conf; + private FileSystem getFileSystemForHdfs(final Properties recipeProperties, + final Configuration conf) throws Exception { + String storageEndpoint = RecipeToolOptions.CLUSTER_HDFS_WRITE_ENDPOINT.getName(); + String nameNode = recipeProperties.getProperty(storageEndpoint); + conf.set(FS_DEFAULT_NAME_KEY, nameNode); + if (UserGroupInformation.isSecurityEnabled()) { + String nameNodePrincipal = recipeProperties.getProperty(RecipeToolOptions.RECIPE_NN_PRINCIPAL.getName()); + conf.set(NN_PRINCIPAL, nameNodePrincipal); + } + return createFileSystem(UserGroupInformation.getLoginUser(), new URI(nameNode), conf); } - private FileSystem getFileSystemForHdfs(final Properties recipeProperties) throws Exception { - if (hdfsFileSystem == null) { - String storageEndpoint = RecipeToolOptions.SOURCE_CLUSTER_HDFS_WRITE_ENDPOINT.getName(); - hdfsFileSystem = FileSystem.get( - getConfiguration(recipeProperties.getProperty(storageEndpoint))); - } + private FileSystem createFileSystem(UserGroupInformation ugi, final URI uri, + final Configuration conf) throws Exception { + try { + final String proxyUserName = ugi.getShortUserName(); + if (proxyUserName.equals(UserGroupInformation.getLoginUser().getShortUserName())) { + return FileSystem.get(uri, conf); + } - return hdfsFileSystem; + return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() { + public FileSystem run() throws Exception { + return FileSystem.get(uri, conf); + } + }); + } catch (InterruptedException ex) { + throw new IOException("Exception creating FileSystem:" + ex.getMessage(), ex); + } } } http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/client/src/main/java/org/apache/falcon/recipe/RecipeToolArgs.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/recipe/RecipeToolArgs.java b/client/src/main/java/org/apache/falcon/recipe/RecipeToolArgs.java index baa4846..79d8f18 100644 --- a/client/src/main/java/org/apache/falcon/recipe/RecipeToolArgs.java +++ b/client/src/main/java/org/apache/falcon/recipe/RecipeToolArgs.java @@ -25,10 +25,11 @@ import org.apache.commons.cli.Option; * Recipe tool args. */ public enum RecipeToolArgs { - RECIPE_FILE_ARG("file", "recipe file path"), + RECIPE_FILE_ARG("file", "recipe template file path"), RECIPE_PROPERTIES_FILE_ARG("propertiesFile", "recipe properties file path"), RECIPE_PROCESS_XML_FILE_PATH_ARG( - "recipeProcessFilePath", "file path of recipe process to be submitted"); + "recipeProcessFilePath", "file path of recipe process to be submitted"), + RECIPE_OPERATION_ARG("recipeOperation", "recipe operation"); private final String name; private final String description; http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/client/src/main/java/org/apache/falcon/recipe/RecipeToolOptions.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/recipe/RecipeToolOptions.java b/client/src/main/java/org/apache/falcon/recipe/RecipeToolOptions.java index a1c29cd..5df9b0a 100644 --- a/client/src/main/java/org/apache/falcon/recipe/RecipeToolOptions.java +++ b/client/src/main/java/org/apache/falcon/recipe/RecipeToolOptions.java @@ -18,19 +18,43 @@ package org.apache.falcon.recipe; +import java.util.Map; +import java.util.HashMap; + /** * Recipe tool options. */ public enum RecipeToolOptions { - SOURCE_CLUSTER_HDFS_WRITE_ENDPOINT( - "falcon.recipe.src.cluster.hdfs.writeEndPoint", "source cluster HDFS write endpoint"), + RECIPE_NAME("falcon.recipe.name", "Recipe name", false), + CLUSTER_NAME("falcon.recipe.cluster.name", "Cluster name where replication job should run", false), + CLUSTER_HDFS_WRITE_ENDPOINT( + "falcon.recipe.cluster.hdfs.writeEndPoint", "Cluster HDFS write endpoint"), + CLUSTER_VALIDITY_START("falcon.recipe.cluster.validity.start", "Source cluster validity start", false), + CLUSTER_VALIDITY_END("falcon.recipe.cluster.validity.end", "Source cluster validity end", false), + WORKFLOW_NAME("falcon.recipe.workflow.name", "Workflow name", false), WORKFLOW_PATH("falcon.recipe.workflow.path", "Workflow path", false), - WORKFLOW_LIB_PATH("falcon.recipe.workflow.lib.path", "WF lib path", false); + WORKFLOW_LIB_PATH("falcon.recipe.workflow.lib.path", "WF lib path", false), + PROCESS_FREQUENCY("falcon.recipe.process.frequency", "Process frequency", false), + RETRY_POLICY("falcon.recipe.retry.policy", "Retry policy", false), + RETRY_DELAY("falcon.recipe.retry.delay", "Retry delay", false), + RETRY_ATTEMPTS("falcon.recipe.retry.attempts", "Retry attempts", false), + RECIPE_TAGS("falcon.recipe.tags", "Recipe tags", false), + RECIPE_ACL_OWNER("falcon.recipe.acl.owner", "Recipe acl owner", false), + RECIPE_ACL_GROUP("falcon.recipe.acl.group", "Recipe acl group", false), + RECIPE_ACL_PERMISSION("falcon.recipe.acl.permission", "Recipe acl permission", false), + RECIPE_NN_PRINCIPAL("falcon.recipe.nn.principal", "Recipe DFS NN principal", false); private final String name; private final String description; private final boolean isRequired; + public static final Map<String, RecipeToolOptions> OPTIONSMAP = new HashMap<>(); + static { + for (RecipeToolOptions c : RecipeToolOptions.values()) { + OPTIONSMAP.put(c.getName(), c); + } + } + RecipeToolOptions(String name, String description) { this(name, description, true); } http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/client/src/main/java/org/apache/falcon/recipe/util/RecipeProcessBuilderUtils.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/recipe/util/RecipeProcessBuilderUtils.java b/client/src/main/java/org/apache/falcon/recipe/util/RecipeProcessBuilderUtils.java new file mode 100644 index 0000000..9522816 --- /dev/null +++ b/client/src/main/java/org/apache/falcon/recipe/util/RecipeProcessBuilderUtils.java @@ -0,0 +1,272 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.recipe.util; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.Frequency; +import org.apache.falcon.entity.v0.SchemaHelper; +import org.apache.falcon.entity.v0.process.ACL; +import org.apache.falcon.entity.v0.process.Cluster; +import org.apache.falcon.entity.v0.process.PolicyType; +import org.apache.falcon.entity.v0.process.Property; +import org.apache.falcon.entity.v0.process.Retry; +import org.apache.falcon.entity.v0.process.Workflow; +import org.apache.falcon.recipe.RecipeToolOptions; + +import javax.xml.bind.JAXBException; +import javax.xml.bind.Unmarshaller; +import javax.xml.bind.ValidationEvent; +import javax.xml.bind.ValidationEventHandler; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileOutputStream; +import java.io.FileReader; +import java.io.OutputStream; +import java.net.URL; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Recipe builder utility. + */ +public final class RecipeProcessBuilderUtils { + + private static final Pattern RECIPE_VAR_PATTERN = Pattern.compile("##[A-Za-z0-9_.]*##"); + + private RecipeProcessBuilderUtils() { + } + + public static String createProcessFromTemplate(final String processTemplateFile, final Properties recipeProperties, + final String processFilename) throws Exception { + org.apache.falcon.entity.v0.process.Process process = bindAttributesInTemplate( + processTemplateFile, recipeProperties); + String recipeProcessFilename = createProcessXmlFile(processFilename, process); + + validateProcessXmlFile(recipeProcessFilename); + return recipeProcessFilename; + } + + private static org.apache.falcon.entity.v0.process.Process + bindAttributesInTemplate(final String templateFile, final Properties recipeProperties) + throws Exception { + if (templateFile == null || recipeProperties == null) { + throw new IllegalArgumentException("Invalid arguments passed"); + } + + Unmarshaller unmarshaller = EntityType.PROCESS.getUnmarshaller(); + // Validation can be skipped for unmarshalling as we want to bind tempalte with the properties. Vaildation is + // hanles as part of marshalling + unmarshaller.setSchema(null); + unmarshaller.setEventHandler(new ValidationEventHandler() { + public boolean handleEvent(ValidationEvent validationEvent) { + return true; + } + } + ); + + URL processResourceUrl = new File(templateFile).toURI().toURL(); + org.apache.falcon.entity.v0.process.Process process = + (org.apache.falcon.entity.v0.process.Process) unmarshaller.unmarshal(processResourceUrl); + + /* For optional properties user might directly set them in the process xml and might not set it in properties + file. Before doing the submission validation is done to confirm process xml doesn't have RECIPE_VAR_PATTERN + */ + + String processName = recipeProperties.getProperty(RecipeToolOptions.RECIPE_NAME.getName()); + if (StringUtils.isNotEmpty(processName)) { + process.setName(processName); + } + + // DR process template has only one cluster + bindClusterProperties(process.getClusters().getClusters().get(0), recipeProperties); + + // bind scheduling properties + String processFrequency = recipeProperties.getProperty(RecipeToolOptions.PROCESS_FREQUENCY.getName()); + if (StringUtils.isNotEmpty(processFrequency)) { + process.setFrequency(Frequency.fromString(processFrequency)); + } + + bindWorkflowProperties(process.getWorkflow(), recipeProperties); + bindRetryProperties(process.getRetry(), recipeProperties); + bindACLProperties(process.getACL(), recipeProperties); + bindTagsProperties(process, recipeProperties); + bindCustomProperties(process.getProperties(), recipeProperties); + + return process; + } + + private static void bindClusterProperties(final Cluster cluster, + final Properties recipeProperties) { + // DR process template has only one cluster + String clusterName = recipeProperties.getProperty(RecipeToolOptions.CLUSTER_NAME.getName()); + if (StringUtils.isNotEmpty(clusterName)) { + cluster.setName(clusterName); + } + + String clusterStartValidity = recipeProperties.getProperty(RecipeToolOptions.CLUSTER_VALIDITY_START.getName()); + if (StringUtils.isNotEmpty(clusterStartValidity)) { + cluster.getValidity().setStart(SchemaHelper.parseDateUTC(clusterStartValidity)); + } + + String clusterEndValidity = recipeProperties.getProperty(RecipeToolOptions.CLUSTER_VALIDITY_END.getName()); + if (StringUtils.isNotEmpty(clusterEndValidity)) { + cluster.getValidity().setEnd(SchemaHelper.parseDateUTC(clusterEndValidity)); + } + } + + private static void bindWorkflowProperties(final Workflow wf, + final Properties recipeProperties) { + String wfName = recipeProperties.getProperty(RecipeToolOptions.WORKFLOW_NAME.getName()); + if (StringUtils.isNotEmpty(wfName)) { + wf.setName(wfName); + } + + String wfLibPath = recipeProperties.getProperty(RecipeToolOptions.WORKFLOW_LIB_PATH.getName()); + if (StringUtils.isNotEmpty(wfLibPath)) { + wf.setLib(wfLibPath); + } else if (wf.getLib().startsWith("##")) { + wf.setLib(""); + } + + String wfPath = recipeProperties.getProperty(RecipeToolOptions.WORKFLOW_PATH.getName()); + if (StringUtils.isNotEmpty(wfPath)) { + wf.setPath(wfPath); + } + } + + private static void bindRetryProperties(final Retry processRetry, + final Properties recipeProperties) { + String retryPolicy = recipeProperties.getProperty(RecipeToolOptions.RETRY_POLICY.getName()); + if (StringUtils.isNotEmpty(retryPolicy)) { + processRetry.setPolicy(PolicyType.fromValue(retryPolicy)); + } + + String retryAttempts = recipeProperties.getProperty(RecipeToolOptions.RETRY_ATTEMPTS.getName()); + if (StringUtils.isNotEmpty(retryAttempts)) { + processRetry.setAttempts(Integer.parseInt(retryAttempts)); + } + + String retryDelay = recipeProperties.getProperty(RecipeToolOptions.RETRY_DELAY.getName()); + if (StringUtils.isNotEmpty(retryDelay)) { + processRetry.setDelay(Frequency.fromString(retryDelay)); + } + } + + private static void bindACLProperties(final ACL acl, + final Properties recipeProperties) { + String aclowner = recipeProperties.getProperty(RecipeToolOptions.RECIPE_ACL_OWNER.getName()); + if (StringUtils.isNotEmpty(aclowner)) { + acl.setOwner(aclowner); + } + + String aclGroup = recipeProperties.getProperty(RecipeToolOptions.RECIPE_ACL_GROUP.getName()); + if (StringUtils.isNotEmpty(aclGroup)) { + acl.setGroup(aclGroup); + } + + String aclPermission = recipeProperties.getProperty(RecipeToolOptions.RECIPE_ACL_PERMISSION.getName()); + if (StringUtils.isNotEmpty(aclPermission)) { + acl.setPermission(aclPermission); + } + } + + private static void bindTagsProperties(final org.apache.falcon.entity.v0.process.Process process, + final Properties recipeProperties) { + String falconSystemTags = process.getTags(); + String tags = recipeProperties.getProperty(RecipeToolOptions.RECIPE_TAGS.getName()); + if (StringUtils.isNotEmpty(tags)) { + if (StringUtils.isNotEmpty(falconSystemTags)) { + tags += ", " + falconSystemTags; + } + process.setTags(tags); + } + } + + + private static void bindCustomProperties(final org.apache.falcon.entity.v0.process.Properties customProperties, + final Properties recipeProperties) { + List<Property> propertyList = new ArrayList<>(); + + for (Map.Entry<Object, Object> recipeProperty : recipeProperties.entrySet()) { + if (RecipeToolOptions.OPTIONSMAP.get(recipeProperty.getKey().toString()) == null) { + addProperty(propertyList, (String) recipeProperty.getKey(), (String) recipeProperty.getValue()); + } + } + + customProperties.getProperties().addAll(propertyList); + } + + private static void addProperty(List<Property> propertyList, String name, String value) { + Property prop = new Property(); + prop.setName(name); + prop.setValue(value); + propertyList.add(prop); + } + + private static String createProcessXmlFile(final String outFilename, + final Entity entity) throws Exception { + if (outFilename == null || entity == null) { + throw new IllegalArgumentException("Invalid arguments passed"); + } + + EntityType type = EntityType.PROCESS; + OutputStream out = null; + try { + out = new FileOutputStream(outFilename); + type.getMarshaller().marshal(entity, out); + } catch (JAXBException e) { + throw new Exception("Unable to serialize the entity object " + type + "/" + entity.getName(), e); + } finally { + IOUtils.closeQuietly(out); + } + return outFilename; + } + + private static void validateProcessXmlFile(final String processFileName) throws Exception { + if (processFileName == null) { + throw new IllegalArgumentException("Invalid arguments passed"); + } + + String line; + BufferedReader reader = null; + + try { + reader = new BufferedReader(new FileReader(processFileName)); + while ((line = reader.readLine()) != null) { + Matcher matcher = RECIPE_VAR_PATTERN.matcher(line); + if (matcher.find()) { + String variable = line.substring(matcher.start(), matcher.end()); + throw new Exception("Match not found for the template: " + variable + + " in recipe template file. Please add it in recipe properties file"); + } + } + } finally { + IOUtils.closeQuietly(reader); + } + + } + +} http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/docs/src/site/twiki/InstallationSteps.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/InstallationSteps.twiki b/docs/src/site/twiki/InstallationSteps.twiki index 3dd034b..90d765b 100644 --- a/docs/src/site/twiki/InstallationSteps.twiki +++ b/docs/src/site/twiki/InstallationSteps.twiki @@ -31,6 +31,8 @@ It builds and installs the package into the local repository, for use as a depen [optionally -Doozie.version=<<oozie version>> can be appended to build with a specific version of Oozie. Oozie versions >= 4 are supported] NOTE: Falcon builds with JDK 1.7 using -noverify option + To compile Falcon with Hive Replication, optionally "-P hadoop-2,hivedr" can be appended. For this Hive >= 1.2.0 + and Oozie >= 4.2.0 should be available. http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java b/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java index e3de6a4..49fb4f7 100644 --- a/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java +++ b/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java @@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; +import java.util.Arrays; import java.util.Properties; /** @@ -47,10 +48,15 @@ public class SharedLibraryHostingService implements ConfigurationChangeListener private static final String[] LIBS = StartupProperties.get().getProperty("shared.libs").split(","); - private static final FalconPathFilter NON_FALCON_JAR_FILTER = new FalconPathFilter() { + private static class FalconLibPath implements FalconPathFilter { + private String[] shareLibs; + FalconLibPath(String[] libList) { + this.shareLibs = Arrays.copyOf(libList, libList.length); + } + @Override public boolean accept(Path path) { - for (String jarName : LIBS) { + for (String jarName : shareLibs) { if (path.getName().startsWith(jarName)) { return true; } @@ -60,7 +66,7 @@ public class SharedLibraryHostingService implements ConfigurationChangeListener @Override public String getJarName(Path path) { - for (String jarName : LIBS) { + for (String jarName : shareLibs) { if (path.getName().startsWith(jarName)) { return jarName; } @@ -84,9 +90,10 @@ public class SharedLibraryHostingService implements ConfigurationChangeListener "lib"); Path libext = new Path(ClusterHelper.getLocation(cluster, ClusterLocationType.WORKING).getPath(), "libext"); + FalconPathFilter nonFalconJarFilter = new FalconLibPath(LIBS); Properties properties = StartupProperties.get(); pushLibsToHDFS(fs, properties.getProperty("system.lib.location"), lib, - NON_FALCON_JAR_FILTER); + nonFalconJarFilter); pushLibsToHDFS(fs, properties.getProperty("libext.paths"), libext, null); pushLibsToHDFS(fs, properties.getProperty("libext.feed.paths"), new Path(libext, EntityType.FEED.name()) , null); @@ -107,7 +114,6 @@ public class SharedLibraryHostingService implements ConfigurationChangeListener if (StringUtils.isEmpty(src)) { return; } - LOG.debug("Copying libs from {}", src); createTargetPath(fs, target); http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 34a5471..ec04bdf 100644 --- a/pom.xml +++ b/pom.xml @@ -130,7 +130,7 @@ <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> <scope>provided</scope> - <exclusions> + <exclusions> <exclusion> <groupId>com.sun.jersey</groupId> <artifactId>jersey-server</artifactId> @@ -151,12 +151,12 @@ <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-annotations</artifactId> </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdfs</artifactId> - <version>${hadoop.version}</version> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <version>${hadoop.version}</version> <scope>provided</scope> <exclusions> <exclusion> @@ -166,10 +166,10 @@ </exclusions> </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - <version>${hadoop.version}</version> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop.version}</version> <scope>provided</scope> <exclusions> <exclusion> @@ -179,10 +179,10 @@ </exclusions> </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-mapreduce-client-common</artifactId> - <version>${hadoop.version}</version> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-common</artifactId> + <version>${hadoop.version}</version> <scope>provided</scope> <exclusions> <exclusion> @@ -200,18 +200,18 @@ <scope>test</scope> </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - <version>${hadoop.version}</version> - <classifier>tests</classifier> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop.version}</version> + <classifier>tests</classifier> <scope>test</scope> </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-server-resourcemanager</artifactId> - <version>${hadoop.version}</version> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-resourcemanager</artifactId> + <version>${hadoop.version}</version> <scope>provided</scope> </dependency> @@ -241,6 +241,13 @@ <version>${hadoop.version}</version> <scope>provided</scope> </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + <version>${hadoop.version}</version> + <scope>provided</scope> + </dependency> </dependencies> </dependencyManagement> </profile> @@ -255,7 +262,7 @@ <descriptors> <descriptor>src/main/assemblies/distributed-package.xml</descriptor> <descriptor>src/main/assemblies/src-package.xml</descriptor> - </descriptors> + </descriptors> <finalName>apache-falcon-distributed-${project.version}</finalName> </configuration> </plugin> @@ -368,11 +375,47 @@ <properties> <excluded.test.groups/> </properties> - </profile> + </profile> + <profile> + <id>hivedr</id> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-enforcer-plugin</artifactId> + <version>1.3.1</version> + <executions> + <execution> + <id>enforce-property</id> + <goals> + <goal>enforce</goal> + </goals> + <configuration> + <rules> + <requireProperty> + <property>hive.version</property> + <regex>^(1.2.*)</regex> + <regexMessage>HiveDR only supports hive version >= 1.2.0</regexMessage> + <property>oozie.version</property> + <regex>^(4.2.*)</regex> + <regexMessage>HiveDR only supports oozie version >= 4.2.0</regexMessage> + </requireProperty> + </rules> + <fail>true</fail> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + <modules> + <module>addons/hivedr</module> + </modules> + </profile> </profiles> <modules> - <module>falcon-ui</module> + <module>falcon-ui</module> <module>checkstyle</module> <module>build-tools</module> <module>client</module> @@ -882,12 +925,15 @@ <!-- this is needed for embedded oozie --> <dependency> - <groupId>org.apache.hive.hcatalog</groupId> - <artifactId>hive-webhcat-java-client</artifactId> + <groupId>org.apache.hive</groupId> + <artifactId>hive-exec</artifactId> <version>${hive.version}</version> <exclusions> <exclusion> - <!-- This implies you cannot use orc files --> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + </exclusion> + <exclusion> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> </exclusion> @@ -907,6 +953,18 @@ </dependency> <dependency> + <groupId>org.apache.hive.hcatalog</groupId> + <artifactId>hive-webhcat-java-client</artifactId> + <version>${hive.version}</version> + <exclusions> + <exclusion> <!-- conflict with hadoop-auth --> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> <groupId>com.github.stephenc.findbugs</groupId> <artifactId>findbugs-annotations</artifactId> <version>1.3.9-1</version> @@ -1034,7 +1092,7 @@ <version>2.8.1</version> </plugin> - <plugin> + <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <version>2.16</version> http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/replication/pom.xml ---------------------------------------------------------------------- diff --git a/replication/pom.xml b/replication/pom.xml index 8c4d6b4..43b6463 100644 --- a/replication/pom.xml +++ b/replication/pom.xml @@ -26,9 +26,9 @@ <artifactId>falcon-main</artifactId> <version>0.7-SNAPSHOT</version> </parent> - <artifactId>falcon-replication</artifactId> - <description>Apache Falcon Replication Module</description> - <name>Apache Falcon Replication</name> + <artifactId>falcon-distcp-replication</artifactId> + <description>Apache Falcon Distcp Replication Module</description> + <name>Apache Falcon Distcp Replication</name> <packaging>jar</packaging> <profiles> @@ -46,6 +46,10 @@ <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-distcp</artifactId> </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-auth</artifactId> + </dependency> </dependencies> </profile> </profiles>
