Repository: falcon Updated Branches: refs/heads/master fa66c17e9 -> c404da28c
FALCON-1880 To support TDE encryption, add --skipcrccheck to distcp options for HiveDR. Add --skipcrccheck to distcp options for HiveDR. Author: bvellanki <[email protected]> Reviewers: "Peeyush Bishnoi <[email protected]>, Ajay Yadava <[email protected]>, Venkatesan Ramachandran <[email protected]>" Closes #82 from bvellanki/master Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/c404da28 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/c404da28 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/c404da28 Branch: refs/heads/master Commit: c404da28cc5eeefa158ea5a2aca38b4b2cf731ee Parents: fa66c17 Author: bvellanki <[email protected]> Authored: Wed Apr 6 15:16:36 2016 -0700 Committer: bvellanki <[email protected]> Committed: Wed Apr 6 15:16:36 2016 -0700 ---------------------------------------------------------------------- addons/hivedr/pom.xml | 6 +++ .../java/org/apache/falcon/hive/HiveDRArgs.java | 3 ++ .../org/apache/falcon/hive/HiveDROptions.java | 7 ++- .../java/org/apache/falcon/hive/HiveDRTool.java | 4 +- .../org/apache/falcon/hive/util/EventUtils.java | 51 ++++++++++---------- .../hive-disaster-recovery-secure-workflow.xml | 6 +++ .../hive-disaster-recovery-secure.properties | 2 + .../recipe/HiveReplicationRecipeTool.java | 4 ++ .../HiveReplicationRecipeToolOptions.java | 1 + docs/src/site/twiki/HiveDR.twiki | 6 +++ 10 files changed, 61 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/c404da28/addons/hivedr/pom.xml ---------------------------------------------------------------------- diff --git a/addons/hivedr/pom.xml b/addons/hivedr/pom.xml index 37dc5c9..adf0459 100644 --- a/addons/hivedr/pom.xml +++ b/addons/hivedr/pom.xml @@ -172,6 +172,12 @@ <artifactId>hadoop-distcp</artifactId> <scope>compile</scope> </dependency> + + <dependency> + <groupId>org.apache.derby</groupId> + <artifactId>derby</artifactId> + <version>${derby.version}</version> + </dependency> </dependencies> </profile> </profiles> http://git-wip-us.apache.org/repos/asf/falcon/blob/c404da28/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java index 5490232..c9ad47e 100644 --- a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java +++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java @@ -64,6 +64,9 @@ public enum HiveDRArgs { REPLICATION_MAX_MAPS("replicationMaxMaps", "number of maps", false), DISTCP_MAX_MAPS("distcpMaxMaps", "number of maps", false), + // Set to true if TDE is enabled + TDE_ENCRYPTION_ENABLED("tdeEncryptionEnabled", "Set to true if TDE encryption is enabled", false), + // Map Bandwidth DISTCP_MAP_BANDWIDTH("distcpMapBandwidth", "map bandwidth in mb", false), http://git-wip-us.apache.org/repos/asf/falcon/blob/c404da28/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDROptions.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDROptions.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDROptions.java index 28515e4..868ec8d 100644 --- a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDROptions.java +++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDROptions.java @@ -120,7 +120,7 @@ public class HiveDROptions { } public int getMaxEvents() { - return Integer.valueOf(context.get(HiveDRArgs.MAX_EVENTS)); + return Integer.parseInt(context.get(HiveDRArgs.MAX_EVENTS)); } public boolean shouldKeepHistory() { @@ -147,6 +147,11 @@ public class HiveDROptions { return context.get(HiveDRArgs.EXECUTION_STAGE); } + public boolean isTDEEncryptionEnabled() { + return StringUtils.isEmpty(context.get(HiveDRArgs.TDE_ENCRYPTION_ENABLED)) + ? false : Boolean.valueOf(context.get(HiveDRArgs.TDE_ENCRYPTION_ENABLED)); + } + public boolean shouldBlock() { return true; } http://git-wip-us.apache.org/repos/asf/falcon/blob/c404da28/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java index e141800..17eec22 100644 --- a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java +++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java @@ -271,8 +271,8 @@ public class HiveDRTool extends Configured implements Tool { private String sourceEvents() throws Exception { MetaStoreEventSourcer defaultSourcer = null; String inputFilename = null; - String lastEventsIdFile = FileUtils.DEFAULT_EVENT_STORE_PATH +File.separator+inputOptions.getJobName()+"/" - +inputOptions.getJobName()+".id"; + String lastEventsIdFile = FileUtils.DEFAULT_EVENT_STORE_PATH + File.separator + + inputOptions.getJobName() + File.separator + inputOptions.getJobName() + ".id"; Map<String, Long> lastEventsIdMap = getLastDBTableEvents(new Path(lastEventsIdFile)); try { HCatClient sourceMetastoreClient = HiveMetastoreUtils.initializeHiveMetaStoreClient( http://git-wip-us.apache.org/repos/asf/falcon/blob/c404da28/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java index d075bfb..3b088f7 100644 --- a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java +++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java @@ -67,10 +67,12 @@ public class EventUtils { private String jobNN = null; private String jobNNKerberosPrincipal = null; private String targetHiveServer2Uri = null; + private String sourceStagingPath = null; private String targetStagingPath = null; private String targetNN = null; private String targetNNKerberosPrincipal = null; - private String fullyQualifiedTargetStagingPath = null; + private String sourceStagingUri = null; + private String targetStagingUri = null; private List<Path> sourceCleanUpList = null; private List<Path> targetCleanUpList = null; private static final Logger LOG = LoggerFactory.getLogger(EventUtils.class); @@ -93,6 +95,8 @@ public class EventUtils { sourceDatabase = conf.get(HiveDRArgs.SOURCE_DATABASE.getName()); sourceNN = conf.get(HiveDRArgs.SOURCE_NN.getName()); sourceNNKerberosPrincipal = conf.get(HiveDRArgs.SOURCE_NN_KERBEROS_PRINCIPAL.getName()); + sourceStagingPath = conf.get(HiveDRArgs.SOURCE_STAGING_PATH.getName()) + + File.separator + conf.get(HiveDRArgs.JOB_NAME.getName()); jobNN = conf.get(HiveDRArgs.JOB_CLUSTER_NN.getName()); jobNNKerberosPrincipal = conf.get(HiveDRArgs.JOB_CLUSTER_NN_KERBEROS_PRINCIPAL.getName()); targetHiveServer2Uri = conf.get(HiveDRArgs.TARGET_HS2_URI.getName()); @@ -139,7 +143,8 @@ public class EventUtils { public void initializeFS() throws IOException { LOG.info("Initializing staging directory"); - fullyQualifiedTargetStagingPath = new Path(targetNN, targetStagingPath).toString(); + sourceStagingUri = new Path(sourceNN, sourceStagingPath).toString(); + targetStagingUri = new Path(targetNN, targetStagingPath).toString(); sourceFileSystem = FileSystem.get(FileUtils.getConfiguration(sourceNN, sourceNNKerberosPrincipal)); jobFileSystem = FileSystem.get(FileUtils.getConfiguration(jobNN, jobNNKerberosPrincipal)); targetFileSystem = FileSystem.get(FileUtils.getConfiguration(targetNN, targetNNKerberosPrincipal)); @@ -177,7 +182,7 @@ public class EventUtils { LOG.info("Process the export statements for db {} table {}", dbName, tableName); processCommands(exportEventStr, dbName, tableName, sourceStatement, sourceCleanUpList, false); if (!sourceCleanUpList.isEmpty()) { - invokeCopy(sourceCleanUpList); + invokeCopy(); } } } else if (conf.get(HiveDRArgs.EXECUTION_STAGE.getName()) @@ -310,11 +315,11 @@ public class EventUtils { } } - public void invokeCopy(List<Path> srcStagingPaths) throws Exception { - DistCpOptions options = getDistCpOptions(srcStagingPaths); + public void invokeCopy() throws Exception { + DistCpOptions options = getDistCpOptions(); DistCp distCp = new DistCp(conf, options); - LOG.info("Started DistCp with source Path: {} \ttarget path: {}", StringUtils.join(srcStagingPaths.toArray()), - fullyQualifiedTargetStagingPath); + LOG.info("Started DistCp with source Path: {} \ttarget path: {}", sourceStagingUri, targetStagingUri); + Job distcpJob = distCp.execute(); LOG.info("Distp Hadoop job: {}", distcpJob.getJobID().toString()); LOG.info("Completed DistCp"); @@ -323,27 +328,21 @@ public class EventUtils { } } - public DistCpOptions getDistCpOptions(List<Path> srcStagingPaths) { - /* - * Add the fully qualified sourceNameNode to srcStagingPath uris. This will - * ensure DistCp will succeed when the job is run on target cluster. - */ - List<Path> fullyQualifiedSrcStagingPaths = new ArrayList<Path>(); - for (Path srcPath : srcStagingPaths) { - fullyQualifiedSrcStagingPaths.add(new Path(sourceNN, srcPath.toString())); + public DistCpOptions getDistCpOptions() { + // DistCpOptions expects the first argument to be a file OR a list of Paths + List<Path> sourceUris=new ArrayList<Path>(); + sourceUris.add(new Path(sourceStagingUri)); + DistCpOptions distcpOptions = new DistCpOptions(sourceUris, new Path(targetStagingUri)); + + // setSyncFolder(true) ensures directory structure is maintained when source is copied to target + distcpOptions.setSyncFolder(true); + // skipCRCCheck if TDE is enabled. + if (Boolean.parseBoolean(conf.get(HiveDRArgs.TDE_ENCRYPTION_ENABLED.getName()))) { + distcpOptions.setSkipCRC(true); } - fullyQualifiedSrcStagingPaths.toArray(new Path[fullyQualifiedSrcStagingPaths.size()]); - - DistCpOptions distcpOptions = new DistCpOptions(fullyQualifiedSrcStagingPaths, - new Path(fullyQualifiedTargetStagingPath)); - /* setSyncFolder to false to retain dir structure as in source at the target. If set to true all files will be - copied to the same staging sir at target resulting in DuplicateFileException in DistCp. - */ - - distcpOptions.setSyncFolder(false); distcpOptions.setBlocking(true); - distcpOptions.setMaxMaps(Integer.valueOf(conf.get(HiveDRArgs.DISTCP_MAX_MAPS.getName()))); - distcpOptions.setMapBandwidth(Integer.valueOf(conf.get(HiveDRArgs.DISTCP_MAP_BANDWIDTH.getName()))); + distcpOptions.setMaxMaps(Integer.parseInt(conf.get(HiveDRArgs.DISTCP_MAX_MAPS.getName()))); + distcpOptions.setMapBandwidth(Integer.parseInt(conf.get(HiveDRArgs.DISTCP_MAP_BANDWIDTH.getName()))); return distcpOptions; } http://git-wip-us.apache.org/repos/asf/falcon/blob/c404da28/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure-workflow.xml ---------------------------------------------------------------------- diff --git a/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure-workflow.xml b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure-workflow.xml index 0494cf6..2d6b8be 100644 --- a/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure-workflow.xml +++ b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure-workflow.xml @@ -142,6 +142,8 @@ <arg>${clusterForJobRunWriteEP}</arg> <arg>-clusterForJobNNKerberosPrincipal</arg> <arg>${clusterForJobNNKerberosPrincipal}</arg> + <arg>-tdeEncryptionEnabled</arg> + <arg>${tdeEncryptionEnabled}</arg> <arg>-drJobName</arg> <arg>${drJobName}-${nominalTime}</arg> <arg>-executionStage</arg> @@ -240,6 +242,8 @@ <arg>${clusterForJobRunWriteEP}</arg> <arg>-clusterForJobNNKerberosPrincipal</arg> <arg>${clusterForJobNNKerberosPrincipal}</arg> + <arg>-tdeEncryptionEnabled</arg> + <arg>${tdeEncryptionEnabled}</arg> <arg>-drJobName</arg> <arg>${drJobName}-${nominalTime}</arg> <arg>-executionStage</arg> @@ -340,6 +344,8 @@ <arg>${clusterForJobRunWriteEP}</arg> <arg>-clusterForJobNNKerberosPrincipal</arg> <arg>${clusterForJobNNKerberosPrincipal}</arg> + <arg>-tdeEncryptionEnabled</arg> + <arg>${tdeEncryptionEnabled}</arg> <arg>-drJobName</arg> <arg>${drJobName}-${nominalTime}</arg> <arg>-executionStage</arg> http://git-wip-us.apache.org/repos/asf/falcon/blob/c404da28/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure.properties ---------------------------------------------------------------------- diff --git a/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure.properties b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure.properties index 8d00bb5..331d57e 100644 --- a/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure.properties +++ b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure.properties @@ -102,6 +102,8 @@ replicationMaxMaps=5 distcpMaxMaps=1 # Change it to specify the bandwidth in MB for each mapper in DistCP distcpMapBandwidth=100 +# Set this flag to true if TDE encryption is enabled on source and target +tdeEncryptionEnabled=false ##### Email Notification for Falcon instance completion falcon.recipe.notification.type=email http://git-wip-us.apache.org/repos/asf/falcon/blob/c404da28/client/src/main/java/org/apache/falcon/recipe/HiveReplicationRecipeTool.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/recipe/HiveReplicationRecipeTool.java b/client/src/main/java/org/apache/falcon/recipe/HiveReplicationRecipeTool.java index 8b39673..3df89d3 100644 --- a/client/src/main/java/org/apache/falcon/recipe/HiveReplicationRecipeTool.java +++ b/client/src/main/java/org/apache/falcon/recipe/HiveReplicationRecipeTool.java @@ -129,6 +129,10 @@ public class HiveReplicationRecipeTool implements Recipe { additionalProperties.put(HiveReplicationRecipeToolOptions.CLUSTER_FOR_JOB_NN_KERBEROS_PRINCIPAL.getName(), recipeProperties.getProperty(RecipeToolOptions.RECIPE_NN_PRINCIPAL.getName())); } + if (StringUtils.isEmpty( + recipeProperties.getProperty(HiveReplicationRecipeToolOptions.TDE_ENCRYPTION_ENABLED.getName()))) { + additionalProperties.put(HiveReplicationRecipeToolOptions.TDE_ENCRYPTION_ENABLED.getName(), "false"); + } return additionalProperties; } http://git-wip-us.apache.org/repos/asf/falcon/blob/c404da28/client/src/main/java/org/apache/falcon/recipe/HiveReplicationRecipeToolOptions.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/recipe/HiveReplicationRecipeToolOptions.java b/client/src/main/java/org/apache/falcon/recipe/HiveReplicationRecipeToolOptions.java index ec0465d..3d69d6e 100644 --- a/client/src/main/java/org/apache/falcon/recipe/HiveReplicationRecipeToolOptions.java +++ b/client/src/main/java/org/apache/falcon/recipe/HiveReplicationRecipeToolOptions.java @@ -54,6 +54,7 @@ public enum HiveReplicationRecipeToolOptions { CLUSTER_FOR_JOB_NN_KERBEROS_PRINCIPAL("clusterForJobNNKerberosPrincipal", "Write EP of cluster on which replication job runs", false), CLUSTER_FOR_JOB_RUN_WRITE_EP("clusterForJobRunWriteEP", "Write EP of cluster on which replication job runs", false), + TDE_ENCRYPTION_ENABLED("tdeEncryptionEnabled", "Set to true if TDE encryption is enabled", false), HIVE_DR_JOB_NAME("drJobName", "Unique hive DR job name", false); private final String name; http://git-wip-us.apache.org/repos/asf/falcon/blob/c404da28/docs/src/site/twiki/HiveDR.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/HiveDR.twiki b/docs/src/site/twiki/HiveDR.twiki index a8f6aee..cf35694 100644 --- a/docs/src/site/twiki/HiveDR.twiki +++ b/docs/src/site/twiki/HiveDR.twiki @@ -53,6 +53,11 @@ Following is the prerequisites to use Hive DR in Falcon conf client.properties. Now update the copied recipe properties file with required attributes to replicate metadata and data from source cluster to destination cluster for Hive DR. + * *Note : HiveDR on TDE encrypted clusters* + When submitting HiveDR recipe in a kerberos secured setup, it is possible that the source and target staging directories + are encrypted using Transparent Data Encryption (TDE). If your cluster dirs are TDE encrypted, please set + "tdeEncryptionEnabled=true" in the recipe properties file. Default value for this property is "false". + ---+++ Submit Hive DR recipe After updating the recipe properties file with required attributes in directory path or in falcon.recipe.path, there are two ways of submitting the Hive DR recipe: @@ -72,3 +77,4 @@ Following is the prerequisites to use Hive DR *Note:* * Recipe properties file, workflow file and template file name must match to the recipe name, it must be unique and in the same directory. * If kerberos security is enabled on cluster, use the secure templates for Hive DR from $FALCON_HOME/data-mirroring/hive-disaster-recovery . +
