Repository: sqoop
Updated Branches:
  refs/heads/sqoop2 a78adb468 -> b35d7573d


SQOOP-1872: IDF API should expose a method to add dependent jars

(Veena Basavaraj via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/b35d7573
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/b35d7573
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/b35d7573

Branch: refs/heads/sqoop2
Commit: b35d7573da8e95928343fe41b34f622ff89d4e22
Parents: a78adb4
Author: Jarek Jarcec Cecho <[email protected]>
Authored: Sat Dec 13 14:29:15 2014 -0800
Committer: Jarek Jarcec Cecho <[email protected]>
Committed: Sat Dec 13 14:29:15 2014 -0800

----------------------------------------------------------------------
 .../idf/CSVIntermediateDataFormat.java          | 32 ++++++++++++++++++++
 .../connector/idf/IntermediateDataFormat.java   | 11 +++++++
 .../org/apache/sqoop/driver/JobManager.java     | 25 ++++++---------
 3 files changed, 53 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/b35d7573/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
 
b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
index b377e2d..dbe193d 100644
--- 
a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
+++ 
b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
@@ -23,10 +23,24 @@ import static 
org.apache.sqoop.connector.common.SqoopIDFUtils.*;
 import org.apache.commons.lang.StringUtils;
 import org.apache.log4j.Logger;
 import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.common.FileFormat;
 import org.apache.sqoop.schema.Schema;
 import org.apache.sqoop.schema.type.AbstractComplexListType;
 import org.apache.sqoop.schema.type.Column;
 import org.apache.sqoop.schema.type.ColumnType;
+import org.apache.sqoop.schema.type.FixedPoint;
+import org.apache.sqoop.schema.type.FloatingPoint;
+import org.apache.sqoop.utils.ClassUtils;
+import org.joda.time.DateTime;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalDateTime;
+import org.joda.time.LocalTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
+import org.json.simple.parser.JSONParser;
 
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -381,4 +395,22 @@ public class CSVIntermediateDataFormat extends 
IntermediateDataFormat<String> {
   public String toString() {
     return data;
   }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public List<String> getJars() {
+
+    List<String> jars = super.getJars();
+    // Add JODA classes for IDF date/time handling
+    jars.add(ClassUtils.jarForClass(LocalDate.class));
+    jars.add(ClassUtils.jarForClass(LocalDateTime.class));
+    jars.add(ClassUtils.jarForClass(DateTime.class));
+    jars.add(ClassUtils.jarForClass(LocalTime.class));
+    // Add JSON parsing jar
+    jars.add(ClassUtils.jarForClass(JSONValue.class));
+    return jars;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b35d7573/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
 
b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
index eca7c58..93698a8 100644
--- 
a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
+++ 
b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
@@ -23,6 +23,8 @@ import org.apache.sqoop.schema.Schema;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
 
 /**
  * Abstract class representing a pluggable intermediate data format Sqoop
@@ -133,4 +135,13 @@ public abstract class IntermediateDataFormat<T> {
    * @throws IOException
    */
   public abstract void read(DataInput in) throws IOException;
+
+  /**
+   * Provide the external jars that the IDF depends on
+   * @return list of jars
+   */
+  public List<String> getJars() {
+    return new LinkedList<String>();
+  }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b35d7573/core/src/main/java/org/apache/sqoop/driver/JobManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/driver/JobManager.java 
b/core/src/main/java/org/apache/sqoop/driver/JobManager.java
index 2709512..01073d4 100644
--- a/core/src/main/java/org/apache/sqoop/driver/JobManager.java
+++ b/core/src/main/java/org/apache/sqoop/driver/JobManager.java
@@ -48,11 +48,6 @@ import org.apache.sqoop.schema.Schema;
 import org.apache.sqoop.submission.SubmissionStatus;
 import org.apache.sqoop.submission.counter.Counters;
 import org.apache.sqoop.utils.ClassUtils;
-import org.joda.time.DateTime;
-import org.joda.time.LocalDate;
-import org.joda.time.LocalDateTime;
-import org.joda.time.LocalTime;
-import org.json.simple.JSONValue;
 
 public class JobManager implements Reconfigurable {
   /**
@@ -291,8 +286,6 @@ public class JobManager implements Reconfigurable {
       // NOTE: the following is a blocking call
       boolean success = submissionEngine.submit(jobRequest);
       if (!success) {
-        // TODO(jarcec): We might need to catch all exceptions here to ensure
-        // that Destroyer will be executed in all cases.
         invokeDestroyerOnJobFailure(jobRequest);
         mSubmission.setStatus(SubmissionStatus.FAILURE_ON_SUBMIT);
       }
@@ -359,7 +352,7 @@ public class JobManager implements Reconfigurable {
     jobRequest.setNotificationUrl(notificationBaseUrl + jobId);
     Class<? extends IntermediateDataFormat<?>> dataFormatClass = fromConnector
         .getIntermediateDataFormat();
-    
jobRequest.setIntermediateDataFormat(fromConnector.getIntermediateDataFormat());
+    jobRequest.setIntermediateDataFormat(dataFormatClass);
 
     jobRequest.setFrom(fromConnector.getFrom());
     jobRequest.setTo(toConnector.getTo());
@@ -369,6 +362,7 @@ public class JobManager implements Reconfigurable {
     addConnectorJars(jobRequest, fromConnector, toConnector, dataFormatClass);
     addConnectorInitializerJars(jobRequest, Direction.FROM);
     addConnectorInitializerJars(jobRequest, Direction.TO);
+    addIDFJars(jobRequest);
 
     // call the intialize method
     initializeConnector(jobRequest, Direction.FROM);
@@ -398,13 +392,6 @@ public class JobManager implements Reconfigurable {
     jobRequest.addJarForClass(SqoopConnector.class);
     // Execution engine jar
     jobRequest.addJarForClass(executionEngine.getClass());
-    // Extra libraries that Sqoop code requires
-    jobRequest.addJarForClass(JSONValue.class);
-    // Add JODA classes for IDF date/time handling
-    jobRequest.addJarForClass(LocalDate.class);
-    jobRequest.addJarForClass(LocalDateTime.class);
-    jobRequest.addJarForClass(DateTime.class);
-    jobRequest.addJarForClass(LocalTime.class);
   }
 
   MSubmission createJobSubmission(HttpEventContext ctx, long jobId) {
@@ -469,6 +456,14 @@ public class JobManager implements Reconfigurable {
   }
 
   @SuppressWarnings({ "unchecked", "rawtypes" })
+  // TODO:SQOOP-1882 , should add the FROM and TO connector IDF jars
+  private void addIDFJars(JobRequest jobRequest) {
+    Class<? extends IntermediateDataFormat> idfClass = 
jobRequest.getIntermediateDataFormat();
+    IntermediateDataFormat idf = (IntermediateDataFormat) 
ClassUtils.instantiate(idfClass);
+    jobRequest.addJars(idf.getJars());
+  }
+
+  @SuppressWarnings({ "unchecked", "rawtypes" })
   private void addConnectorInitializerJars(JobRequest jobRequest, Direction 
direction) {
 
     Initializer initializer = getConnectorInitializer(jobRequest, direction);

Reply via email to