Repository: falcon Updated Branches: refs/heads/master fbd9fbedb -> 823c7d1d8
FALCON-1962 Extesnion related bugs Author: Sowmya Ramesh <[email protected]> Reviewers: "Venkat <[email protected]>" Closes #155 from sowmyaramesh/master Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/823c7d1d Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/823c7d1d Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/823c7d1d Branch: refs/heads/master Commit: 823c7d1d8397be19b6fe94a7477bebcb288c94fc Parents: fbd9fbe Author: Sowmya Ramesh <[email protected]> Authored: Fri May 20 13:27:26 2016 -0700 Committer: Sowmya Ramesh <[email protected]> Committed: Fri May 20 13:27:26 2016 -0700 ---------------------------------------------------------------------- .../mirroring/hive/HiveMirroringExtension.java | 38 +++++++++++--------- .../util/ExtensionProcessBuilderUtils.java | 34 ++++++++---------- 2 files changed, 35 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/823c7d1d/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtension.java ---------------------------------------------------------------------- diff --git a/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtension.java b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtension.java index 49b3a12..949aea5 100644 --- a/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtension.java +++ b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtension.java @@ -53,10 +53,11 @@ public class HiveMirroringExtension extends AbstractExtension { } } - Cluster srcCluster = ClusterHelper.getCluster(HiveMirroringExtensionProperties.SOURCE_CLUSTER.getName()); + String srcClusterName = extensionProperties.getProperty(HiveMirroringExtensionProperties.SOURCE_CLUSTER + .getName()); + Cluster srcCluster = ClusterHelper.getCluster(srcClusterName); if (srcCluster == null) { - throw new FalconException("Cluster entity " + HiveMirroringExtensionProperties.SOURCE_CLUSTER.getName() - + " not found"); + throw new FalconException("Cluster entity " + srcClusterName + " not found"); } String srcClusterCatalogUrl = ClusterHelper.getRegistryEndPoint(srcCluster); Configuration srcClusterConf = ClusterHelper.getConfiguration(srcCluster); @@ -96,10 +97,11 @@ public class HiveMirroringExtension extends AbstractExtension { } // Verify db exists on target - Cluster targetCluster = ClusterHelper.getCluster(HiveMirroringExtensionProperties.TARGET_CLUSTER.getName()); + String targetClusterName = extensionProperties.getProperty(HiveMirroringExtensionProperties.TARGET_CLUSTER + .getName()); + Cluster targetCluster = ClusterHelper.getCluster(targetClusterName); if (targetCluster == null) { - throw new FalconException("Cluster entity " + HiveMirroringExtensionProperties.TARGET_CLUSTER.getName() - + " not found"); + throw new FalconException("Cluster entity " + targetClusterName + " not found"); } String targetClusterCatalogUrl = ClusterHelper.getRegistryEndPoint(targetCluster); Configuration targetClusterConf = ClusterHelper.getConfiguration(targetCluster); @@ -118,15 +120,15 @@ public class HiveMirroringExtension extends AbstractExtension { String jobName = extensionProperties.getProperty(ExtensionProperties.JOB_NAME.getName()); // Add job name as Hive DR job additionalProperties.put(HiveMirroringExtensionProperties.HIVE_MIRRORING_JOB_NAME.getName(), - jobName + System.currentTimeMillis()); + jobName); + String clusterName = extensionProperties.getProperty(ExtensionProperties.CLUSTER_NAME.getName()); // Add required properties of cluster where job should run additionalProperties.put(HiveMirroringExtensionProperties.CLUSTER_FOR_JOB_RUN.getName(), - extensionProperties.getProperty(ExtensionProperties.CLUSTER_NAME.getName())); - Cluster jobCluster = ClusterHelper.getCluster(ExtensionProperties.CLUSTER_NAME.getName()); + clusterName); + Cluster jobCluster = ClusterHelper.getCluster(clusterName); if (jobCluster == null) { - throw new FalconException("Cluster entity " + ExtensionProperties.CLUSTER_NAME.getName() - + " not found"); + throw new FalconException("Cluster entity " + clusterName + " not found"); } additionalProperties.put(HiveMirroringExtensionProperties.CLUSTER_FOR_JOB_RUN_WRITE_EP.getName(), ClusterHelper.getStorageUrl(jobCluster)); @@ -139,10 +141,11 @@ public class HiveMirroringExtension extends AbstractExtension { } // Properties for src cluster - Cluster srcCluster = ClusterHelper.getCluster(HiveMirroringExtensionProperties.SOURCE_CLUSTER.getName()); + String srcClusterName = extensionProperties.getProperty(HiveMirroringExtensionProperties.SOURCE_CLUSTER + .getName()); + Cluster srcCluster = ClusterHelper.getCluster(srcClusterName); if (srcCluster == null) { - throw new FalconException("Cluster entity " + HiveMirroringExtensionProperties.SOURCE_CLUSTER.getName() - + " not found"); + throw new FalconException("Cluster entity " + srcClusterName + " not found"); } additionalProperties.put(HiveMirroringExtensionProperties.SOURCE_METASTORE_URI.getName(), ClusterHelper.getRegistryEndPoint(srcCluster)); @@ -171,10 +174,11 @@ public class HiveMirroringExtension extends AbstractExtension { } // Properties for target cluster - Cluster targetCluster = ClusterHelper.getCluster(HiveMirroringExtensionProperties.TARGET_CLUSTER.getName()); + String targetClusterName = extensionProperties.getProperty(HiveMirroringExtensionProperties.TARGET_CLUSTER + .getName()); + Cluster targetCluster = ClusterHelper.getCluster(targetClusterName); if (targetCluster == null) { - throw new FalconException("Cluster entity " + HiveMirroringExtensionProperties.TARGET_CLUSTER.getName() - + " not found"); + throw new FalconException("Cluster entity " + targetClusterName + " not found"); } additionalProperties.put(HiveMirroringExtensionProperties.TARGET_METASTORE_URI.getName(), ClusterHelper.getRegistryEndPoint(targetCluster)); http://git-wip-us.apache.org/repos/asf/falcon/blob/823c7d1d/extensions/src/main/java/org/apache/falcon/extensions/util/ExtensionProcessBuilderUtils.java ---------------------------------------------------------------------- diff --git a/extensions/src/main/java/org/apache/falcon/extensions/util/ExtensionProcessBuilderUtils.java b/extensions/src/main/java/org/apache/falcon/extensions/util/ExtensionProcessBuilderUtils.java index 9e23894..286df3e 100644 --- a/extensions/src/main/java/org/apache/falcon/extensions/util/ExtensionProcessBuilderUtils.java +++ b/extensions/src/main/java/org/apache/falcon/extensions/util/ExtensionProcessBuilderUtils.java @@ -60,10 +60,10 @@ public final class ExtensionProcessBuilderUtils { } public static Entity createProcessFromTemplate(final String processTemplate, - final String extensionName, - final Properties extensionProperties, - final String wfPath, - final String wfLibPath) throws FalconException { + final String extensionName, + final Properties extensionProperties, + final String wfPath, + final String wfLibPath) throws FalconException { if (StringUtils.isBlank(processTemplate) || StringUtils.isBlank(extensionName) || extensionProperties == null || StringUtils.isBlank(wfPath)) { throw new FalconException("Invalid arguments passed to extension builder"); @@ -230,32 +230,26 @@ public final class ExtensionProcessBuilderUtils { private static void bindACLProperties(final ACL acl, final Properties extensionProperties) throws FalconException { - if (!SecurityUtil.isAuthorizationEnabled()) { - return; - } + String aclOwner = extensionProperties.getProperty(ExtensionProperties.JOB_ACL_OWNER.getName()); + String aclGroup = extensionProperties.getProperty(ExtensionProperties.JOB_ACL_GROUP.getName()); + String aclPermission = extensionProperties.getProperty(ExtensionProperties.JOB_ACL_PERMISSION.getName()); - String aclowner = extensionProperties.getProperty(ExtensionProperties.JOB_ACL_OWNER.getName()); - if (StringUtils.isNotEmpty(aclowner)) { - acl.setOwner(aclowner); - } else { - throw new FalconException("ACL owner extension property cannot be null or empty when authorization is " + if (SecurityUtil.isAuthorizationEnabled() && (StringUtils.isEmpty(aclOwner) || StringUtils.isEmpty(aclGroup) + || StringUtils.isEmpty(aclPermission))) { + throw new FalconException("ACL extension properties cannot be null or empty when authorization is " + "enabled"); } - String aclGroup = extensionProperties.getProperty(ExtensionProperties.JOB_ACL_GROUP.getName()); + if (StringUtils.isNotEmpty(aclOwner)) { + acl.setOwner(aclOwner); + } + if (StringUtils.isNotEmpty(aclGroup)) { acl.setGroup(aclGroup); - } else { - throw new FalconException("ACL group extension property cannot be null or empty when authorization is " - + "enabled"); } - String aclPermission = extensionProperties.getProperty(ExtensionProperties.JOB_ACL_PERMISSION.getName()); if (StringUtils.isNotEmpty(aclPermission)) { acl.setPermission(aclPermission); - } else { - throw new FalconException("ACL permission extension property cannot be null or empty when authorization is " - + "enabled"); } }
