Updated Branches: refs/heads/sqoop2 c4ddeb7ff -> 0a0a65a29
SQOOP-895: Sqoop2: Do not serialize framework and connector configurations into mapreduce configuration object (Jarek Jarcec Cecho via Kate Ting) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/0a0a65a2 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/0a0a65a2 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/0a0a65a2 Branch: refs/heads/sqoop2 Commit: 0a0a65a29fcd01251e74e63e8963f8c250e12668 Parents: c4ddeb7 Author: Kate Ting <[email protected]> Authored: Sun Apr 14 18:27:58 2013 -0400 Committer: Kate Ting <[email protected]> Committed: Sun Apr 14 18:27:58 2013 -0400 ---------------------------------------------------------------------- .../java/org/apache/sqoop/job/JobConstants.java | 13 ++++++ .../apache/sqoop/job/mr/ConfigurationUtils.java | 31 ++++++++------- .../mapreduce/MapreduceSubmissionEngine.java | 23 ++++++----- 3 files changed, 43 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/0a0a65a2/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java index e16a2c4..e2b3ce8 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java @@ -17,6 +17,7 @@ */ package org.apache.sqoop.job; +import org.apache.hadoop.io.Text; import org.apache.sqoop.core.ConfigurationConstants; public final class JobConstants extends Constants { @@ -67,15 +68,27 @@ public final class JobConstants extends Constants { public static final String JOB_CONFIG_CONNECTOR_CONNECTION = PREFIX_JOB_CONFIG + "config.connector.connection"; + public static final Text JOB_CONFIG_CONNECTOR_CONNECTION_KEY = + new Text(JOB_CONFIG_CONNECTOR_CONNECTION); + public static final String JOB_CONFIG_CONNECTOR_JOB = PREFIX_JOB_CONFIG + "config.connector.job"; + public static final Text JOB_CONFIG_CONNECTOR_JOB_KEY = + new Text(JOB_CONFIG_CONNECTOR_JOB); + public static final String JOB_CONFIG_FRAMEWORK_CONNECTION = PREFIX_JOB_CONFIG + "config.framework.connection"; + public static final Text JOB_CONFIG_FRAMEWORK_CONNECTION_KEY = + new Text(JOB_CONFIG_FRAMEWORK_CONNECTION); + public static final String JOB_CONFIG_FRAMEWORK_JOB = PREFIX_JOB_CONFIG + "config.framework.job"; + public static final Text JOB_CONFIG_FRAMEWORK_JOB_KEY = + new Text(JOB_CONFIG_FRAMEWORK_JOB); + public static final String PREFIX_CONNECTOR_CONTEXT = PREFIX_JOB_CONFIG + "connector.context."; http://git-wip-us.apache.org/repos/asf/sqoop/blob/0a0a65a2/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java index 4aa2128..64ec437 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java @@ -18,6 +18,8 @@ package org.apache.sqoop.job.mr; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; import org.apache.sqoop.job.JobConstants; import org.apache.sqoop.model.FormUtils; import org.apache.sqoop.model.MJob; @@ -33,48 +35,49 @@ public final class ConfigurationUtils { } public static Object getConnectorConnection(Configuration configuration) { - return loadConfiguration(configuration, + return loadConfiguration((JobConf) configuration, JobConstants.JOB_CONFIG_CLASS_CONNECTOR_CONNECTION, - JobConstants.JOB_CONFIG_CONNECTOR_CONNECTION); + JobConstants.JOB_CONFIG_CONNECTOR_CONNECTION_KEY); } public static Object getConnectorJob(Configuration configuration) { - return loadConfiguration(configuration, + return loadConfiguration((JobConf) configuration, JobConstants.JOB_CONFIG_CLASS_CONNECTOR_JOB, - JobConstants.JOB_CONFIG_CONNECTOR_JOB); + JobConstants.JOB_CONFIG_CONNECTOR_JOB_KEY); } public static Object getFrameworkConnection(Configuration configuration) { - return loadConfiguration(configuration, + return loadConfiguration((JobConf) configuration, JobConstants.JOB_CONFIG_CLASS_FRAMEWORK_CONNECTION, - JobConstants.JOB_CONFIG_FRAMEWORK_CONNECTION); + JobConstants.JOB_CONFIG_FRAMEWORK_CONNECTION_KEY); } public static Object getFrameworkJob(Configuration configuration) { - return loadConfiguration(configuration, + return loadConfiguration((JobConf) configuration, JobConstants.JOB_CONFIG_CLASS_FRAMEWORK_JOB, - JobConstants.JOB_CONFIG_FRAMEWORK_JOB); + JobConstants.JOB_CONFIG_FRAMEWORK_JOB_KEY); } /** - * Load configuration instance serialized in Hadoop configuration object - * @param configuration Hadoop configuration object associated with the job + * Load configuration instance serialized in Hadoop credentials cache. + * + * @param configuration JobConf object associated with the job * @param classProperty Property with stored configuration class name * @param valueProperty Property with stored JSON representation of the * configuration object * @return New instance with loaded data */ - private static Object loadConfiguration(Configuration configuration, - String classProperty, - String valueProperty) { + private static Object loadConfiguration(JobConf configuration, String classProperty, Text valueProperty) { // Create new instance of configuration class Object object = ClassUtils.instantiate(configuration.get(classProperty)); if(object == null) { return null; } + String json = new String(configuration.getCredentials().getSecretKey(valueProperty)); + // Fill it with JSON data - FormUtils.fillValues(configuration.get(valueProperty), object); + FormUtils.fillValues(json, object); // And give it back return object; http://git-wip-us.apache.org/repos/asf/sqoop/blob/0a0a65a2/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java ---------------------------------------------------------------------- diff --git a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java index 77f30ea..001fb02 100644 --- a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java +++ b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java @@ -19,6 +19,7 @@ package org.apache.sqoop.submission.mapreduce; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobID; @@ -26,6 +27,7 @@ import org.apache.hadoop.mapred.JobStatus; import org.apache.hadoop.mapred.RunningJob; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.security.Credentials; import org.apache.log4j.Logger; import org.apache.sqoop.common.MapContext; import org.apache.sqoop.common.SqoopException; @@ -181,16 +183,6 @@ public class MapreduceSubmissionEngine extends SubmissionEngine { configuration.set(JobConstants.JOB_CONFIG_CLASS_FRAMEWORK_JOB, request.getConfigFrameworkJob().getClass().getName()); - // And finally configuration data - configuration.set(JobConstants.JOB_CONFIG_CONNECTOR_CONNECTION, - FormUtils.toJson(request.getConfigConnectorConnection())); - configuration.set(JobConstants.JOB_CONFIG_CONNECTOR_JOB, - FormUtils.toJson(request.getConfigConnectorJob())); - configuration.set(JobConstants.JOB_CONFIG_FRAMEWORK_CONNECTION, - FormUtils.toJson(request.getConfigFrameworkConnection())); - configuration.set(JobConstants.JOB_CONFIG_FRAMEWORK_JOB, - FormUtils.toJson(request.getConfigFrameworkConnection())); - // Set up notification URL if it's available if(request.getNotificationUrl() != null) { configuration.set("job.end.notification.url", request.getNotificationUrl()); @@ -217,6 +209,17 @@ public class MapreduceSubmissionEngine extends SubmissionEngine { try { Job job = new Job(configuration); + // And finally put all configuration objects to credentials cache + Credentials credentials = job.getCredentials(); + credentials.addSecretKey(JobConstants.JOB_CONFIG_CONNECTOR_CONNECTION_KEY, + FormUtils.toJson(request.getConfigConnectorConnection()).getBytes()); + credentials.addSecretKey(JobConstants.JOB_CONFIG_CONNECTOR_JOB_KEY, + FormUtils.toJson(request.getConfigConnectorJob()).getBytes()); + credentials.addSecretKey(JobConstants.JOB_CONFIG_FRAMEWORK_CONNECTION_KEY, + FormUtils.toJson(request.getConfigFrameworkConnection()).getBytes()); + credentials.addSecretKey(JobConstants.JOB_CONFIG_FRAMEWORK_JOB_KEY, + FormUtils.toJson(request.getConfigFrameworkConnection()).getBytes()); + if(request.getJobName() != null) { job.setJobName("Sqoop: " + request.getJobName()); } else {
