Repository: sqoop Updated Branches: refs/heads/sqoop2 a78adb468 -> b35d7573d
SQOOP-1872: IDF API should expose a method to add dependent jars (Veena Basavaraj via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/b35d7573 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/b35d7573 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/b35d7573 Branch: refs/heads/sqoop2 Commit: b35d7573da8e95928343fe41b34f622ff89d4e22 Parents: a78adb4 Author: Jarek Jarcec Cecho <[email protected]> Authored: Sat Dec 13 14:29:15 2014 -0800 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Sat Dec 13 14:29:15 2014 -0800 ---------------------------------------------------------------------- .../idf/CSVIntermediateDataFormat.java | 32 ++++++++++++++++++++ .../connector/idf/IntermediateDataFormat.java | 11 +++++++ .../org/apache/sqoop/driver/JobManager.java | 25 ++++++--------- 3 files changed, 53 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/b35d7573/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java ---------------------------------------------------------------------- diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java index b377e2d..dbe193d 100644 --- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java @@ -23,10 +23,24 @@ import static org.apache.sqoop.connector.common.SqoopIDFUtils.*; import org.apache.commons.lang.StringUtils; import org.apache.log4j.Logger; import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.connector.common.FileFormat; import org.apache.sqoop.schema.Schema; import org.apache.sqoop.schema.type.AbstractComplexListType; import org.apache.sqoop.schema.type.Column; import org.apache.sqoop.schema.type.ColumnType; +import org.apache.sqoop.schema.type.FixedPoint; +import org.apache.sqoop.schema.type.FloatingPoint; +import org.apache.sqoop.utils.ClassUtils; +import org.joda.time.DateTime; +import org.joda.time.LocalDate; +import org.joda.time.LocalDateTime; +import org.joda.time.LocalTime; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; +import org.json.simple.JSONArray; +import org.json.simple.JSONObject; +import org.json.simple.JSONValue; +import org.json.simple.parser.JSONParser; import java.io.DataInput; import java.io.DataOutput; @@ -381,4 +395,22 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> { public String toString() { return data; } + + /** + * {@inheritDoc} + */ + @Override + public List<String> getJars() { + + List<String> jars = super.getJars(); + // Add JODA classes for IDF date/time handling + jars.add(ClassUtils.jarForClass(LocalDate.class)); + jars.add(ClassUtils.jarForClass(LocalDateTime.class)); + jars.add(ClassUtils.jarForClass(DateTime.class)); + jars.add(ClassUtils.jarForClass(LocalTime.class)); + // Add JSON parsing jar + jars.add(ClassUtils.jarForClass(JSONValue.class)); + return jars; + } + } http://git-wip-us.apache.org/repos/asf/sqoop/blob/b35d7573/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java ---------------------------------------------------------------------- diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java index eca7c58..93698a8 100644 --- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java @@ -23,6 +23,8 @@ import org.apache.sqoop.schema.Schema; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.LinkedList; +import java.util.List; /** * Abstract class representing a pluggable intermediate data format Sqoop @@ -133,4 +135,13 @@ public abstract class IntermediateDataFormat<T> { * @throws IOException */ public abstract void read(DataInput in) throws IOException; + + /** + * Provide the external jars that the IDF depends on + * @return list of jars + */ + public List<String> getJars() { + return new LinkedList<String>(); + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sqoop/blob/b35d7573/core/src/main/java/org/apache/sqoop/driver/JobManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/driver/JobManager.java b/core/src/main/java/org/apache/sqoop/driver/JobManager.java index 2709512..01073d4 100644 --- a/core/src/main/java/org/apache/sqoop/driver/JobManager.java +++ b/core/src/main/java/org/apache/sqoop/driver/JobManager.java @@ -48,11 +48,6 @@ import org.apache.sqoop.schema.Schema; import org.apache.sqoop.submission.SubmissionStatus; import org.apache.sqoop.submission.counter.Counters; import org.apache.sqoop.utils.ClassUtils; -import org.joda.time.DateTime; -import org.joda.time.LocalDate; -import org.joda.time.LocalDateTime; -import org.joda.time.LocalTime; -import org.json.simple.JSONValue; public class JobManager implements Reconfigurable { /** @@ -291,8 +286,6 @@ public class JobManager implements Reconfigurable { // NOTE: the following is a blocking call boolean success = submissionEngine.submit(jobRequest); if (!success) { - // TODO(jarcec): We might need to catch all exceptions here to ensure - // that Destroyer will be executed in all cases. invokeDestroyerOnJobFailure(jobRequest); mSubmission.setStatus(SubmissionStatus.FAILURE_ON_SUBMIT); } @@ -359,7 +352,7 @@ public class JobManager implements Reconfigurable { jobRequest.setNotificationUrl(notificationBaseUrl + jobId); Class<? extends IntermediateDataFormat<?>> dataFormatClass = fromConnector .getIntermediateDataFormat(); - jobRequest.setIntermediateDataFormat(fromConnector.getIntermediateDataFormat()); + jobRequest.setIntermediateDataFormat(dataFormatClass); jobRequest.setFrom(fromConnector.getFrom()); jobRequest.setTo(toConnector.getTo()); @@ -369,6 +362,7 @@ public class JobManager implements Reconfigurable { addConnectorJars(jobRequest, fromConnector, toConnector, dataFormatClass); addConnectorInitializerJars(jobRequest, Direction.FROM); addConnectorInitializerJars(jobRequest, Direction.TO); + addIDFJars(jobRequest); // call the intialize method initializeConnector(jobRequest, Direction.FROM); @@ -398,13 +392,6 @@ public class JobManager implements Reconfigurable { jobRequest.addJarForClass(SqoopConnector.class); // Execution engine jar jobRequest.addJarForClass(executionEngine.getClass()); - // Extra libraries that Sqoop code requires - jobRequest.addJarForClass(JSONValue.class); - // Add JODA classes for IDF date/time handling - jobRequest.addJarForClass(LocalDate.class); - jobRequest.addJarForClass(LocalDateTime.class); - jobRequest.addJarForClass(DateTime.class); - jobRequest.addJarForClass(LocalTime.class); } MSubmission createJobSubmission(HttpEventContext ctx, long jobId) { @@ -469,6 +456,14 @@ public class JobManager implements Reconfigurable { } @SuppressWarnings({ "unchecked", "rawtypes" }) + // TODO:SQOOP-1882 , should add the FROM and TO connector IDF jars + private void addIDFJars(JobRequest jobRequest) { + Class<? extends IntermediateDataFormat> idfClass = jobRequest.getIntermediateDataFormat(); + IntermediateDataFormat idf = (IntermediateDataFormat) ClassUtils.instantiate(idfClass); + jobRequest.addJars(idf.getJars()); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) private void addConnectorInitializerJars(JobRequest jobRequest, Direction direction) { Initializer initializer = getConnectorInitializer(jobRequest, direction);
