Repository: sqoop Updated Branches: refs/heads/SQOOP-1367 71279480e -> 5c29a2a29
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java index 01c32e4..e457cff 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java @@ -36,7 +36,7 @@ import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; import org.apache.sqoop.connector.idf.IntermediateDataFormat; import org.apache.sqoop.job.JobConstants; import org.apache.sqoop.job.MapreduceExecutionError; -import org.apache.sqoop.job.PrefixContext; +import org.apache.sqoop.common.PrefixContext; import org.apache.sqoop.job.etl.Loader; import org.apache.sqoop.job.etl.LoaderContext; import org.apache.sqoop.etl.io.DataReader; @@ -72,7 +72,13 @@ public class SqoopOutputFormatLoadExecutor { producer = new SqoopRecordWriter(); data = (IntermediateDataFormat) ClassUtils.instantiate(context .getConfiguration().get(JobConstants.INTERMEDIATE_DATA_FORMAT)); - data.setSchema(ConfigurationUtils.getConnectorSchema(Direction.FROM, context.getConfiguration())); + + Schema schema = ConfigurationUtils.getConnectorSchema(Direction.FROM, context.getConfiguration()); + if (schema==null) { + schema = ConfigurationUtils.getConnectorSchema(Direction.TO, context.getConfiguration()); + } + + data.setSchema(schema); } public RecordWriter<SqoopWritable, NullWritable> getRecordWriter() { http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/execution/mapreduce/src/test/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngineTest.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngineTest.java b/execution/mapreduce/src/test/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngineTest.java index 0f2a882..f70e9bd 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngineTest.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngineTest.java @@ -17,18 +17,9 @@ */ package org.apache.sqoop.execution.mapreduce; -import org.apache.sqoop.common.MutableMapContext; //import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; -import org.apache.sqoop.framework.SubmissionRequest; -import org.apache.sqoop.framework.configuration.ImportJobConfiguration; -import org.apache.sqoop.framework.configuration.OutputCompression; -import org.apache.sqoop.framework.configuration.OutputFormat; -import org.apache.sqoop.job.JobConstants; -import org.apache.sqoop.job.etl.Destroyer; -import org.apache.sqoop.job.etl.Extractor; -import org.apache.sqoop.job.etl.Initializer; -import org.apache.sqoop.job.etl.Partitioner; -import org.junit.Test; +//import org.apache.sqoop.framework.configuration.OutputCompression; +//import org.apache.sqoop.framework.configuration.OutputFormat; import static junit.framework.TestCase.assertEquals; http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java index e460c3e..2accf77 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java @@ -17,40 +17,11 @@ */ package org.apache.sqoop.job; -import java.io.BufferedWriter; -import java.io.IOException; -import java.io.OutputStream; -import java.io.OutputStreamWriter; -import java.util.List; - import junit.framework.TestCase; -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.SequenceFile.CompressionType; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.compress.BZip2Codec; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.mapreduce.Job; //import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; //import org.apache.sqoop.job.etl.HdfsExportExtractor; -import org.apache.sqoop.job.etl.HdfsExportPartitioner; -import org.apache.sqoop.job.etl.HdfsSequenceImportLoader; -import org.apache.sqoop.job.etl.Loader; -import org.apache.sqoop.job.etl.LoaderContext; -import org.apache.sqoop.job.etl.Partition; -import org.apache.sqoop.job.etl.PartitionerContext; -import org.apache.sqoop.job.io.Data; -import org.apache.sqoop.job.mr.ConfigurationUtils; -import org.apache.sqoop.job.mr.SqoopFileOutputFormat; -import org.apache.sqoop.model.MJob; -import org.apache.sqoop.schema.Schema; -import org.apache.sqoop.schema.type.FixedPoint; -import org.apache.sqoop.schema.type.FloatingPoint; -import org.junit.Test; +//import org.apache.sqoop.job.etl.HdfsExportPartitioner; public class TestHdfsExtract extends TestCase { http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java index 65e82b1..8eba049 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java @@ -17,40 +17,11 @@ */ package org.apache.sqoop.job; -import java.io.BufferedReader; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.util.LinkedList; -import java.util.List; - -import com.google.common.base.Charsets; import junit.framework.TestCase; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.util.ReflectionUtils; //import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; -import org.apache.sqoop.job.etl.Extractor; -import org.apache.sqoop.job.etl.ExtractorContext; -import org.apache.sqoop.job.etl.HdfsSequenceImportLoader; -import org.apache.sqoop.job.etl.HdfsTextImportLoader; -import org.apache.sqoop.job.etl.Partition; -import org.apache.sqoop.job.etl.Partitioner; -import org.apache.sqoop.job.etl.PartitionerContext; -import org.apache.sqoop.job.io.Data; -import org.apache.sqoop.job.mr.ConfigurationUtils; -import org.apache.sqoop.job.mr.SqoopFileOutputFormat; -import org.apache.sqoop.model.MJob; -import org.apache.sqoop.schema.Schema; -import org.apache.sqoop.schema.type.FixedPoint; -import org.apache.sqoop.schema.type.FloatingPoint; +//import org.apache.sqoop.job.etl.HdfsSequenceImportLoader; +//import org.apache.sqoop.job.etl.HdfsTextImportLoader; public class TestHdfsLoad extends TestCase { http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 38b4974..fae9fe8 100644 --- a/pom.xml +++ b/pom.xml @@ -301,6 +301,17 @@ limitations under the License. </dependency> <dependency> <groupId>org.apache.sqoop.connector</groupId> + <artifactId>sqoop-connector-hdfs</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.sqoop.connector</groupId> + <artifactId>sqoop-connector-hdfs</artifactId> + <type>test-jar</type> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.sqoop.connector</groupId> <artifactId>sqoop-connector-mysql-jdbc</artifactId> <version>${project.version}</version> </dependency> http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java ---------------------------------------------------------------------- diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java index f355ceb..88be9fb 100644 --- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java +++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java @@ -27,12 +27,7 @@ import java.sql.SQLException; import java.sql.Statement; import java.sql.Timestamp; import java.sql.Types; -import java.util.ArrayList; -import java.util.Date; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Set; +import java.util.*; import javax.sql.DataSource; @@ -1711,14 +1706,16 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler { throws SQLException { List<MJob> jobs = new ArrayList<MJob>(); ResultSet rsJob = null; - PreparedStatement formConnectorFetchStmt = null; + PreparedStatement toFormConnectorFetchStmt = null; + PreparedStatement fromFormConnectorFetchStmt = null; PreparedStatement formFrameworkFetchStmt = null; PreparedStatement inputFetchStmt = null; try { rsJob = stmt.executeQuery(); - formConnectorFetchStmt = conn.prepareStatement(STMT_FETCH_FORM_CONNECTOR); + toFormConnectorFetchStmt = conn.prepareStatement(STMT_FETCH_FORM_CONNECTOR); + fromFormConnectorFetchStmt = conn.prepareStatement(STMT_FETCH_FORM_CONNECTOR); formFrameworkFetchStmt = conn.prepareStatement(STMT_FETCH_FORM_FRAMEWORK); inputFetchStmt = conn.prepareStatement(STMT_FETCH_JOB_INPUT); @@ -1735,28 +1732,47 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler { String updateBy = rsJob.getString(10); Date lastUpdateDate = rsJob.getTimestamp(11); - formConnectorFetchStmt.setLong(1, fromConnectorId); + fromFormConnectorFetchStmt.setLong(1, fromConnectorId); + toFormConnectorFetchStmt.setLong(1,toConnectorId); inputFetchStmt.setLong(1, id); //inputFetchStmt.setLong(1, XXX); // Will be filled by loadForms inputFetchStmt.setLong(3, id); - List<MForm> connectorConnForms = new ArrayList<MForm>(); + List<MForm> toConnectorConnForms = new ArrayList<MForm>(); + List<MForm> fromConnectorConnForms = new ArrayList<MForm>(); + List<MForm> frameworkConnForms = new ArrayList<MForm>(); List<MForm> frameworkJobForms = new ArrayList<MForm>(); - List<MForm> fromJobForms = new ArrayList<MForm>(); - List<MForm> toJobForms = new ArrayList<MForm>(); - loadConnectorForms(connectorConnForms, fromJobForms, toJobForms, - formConnectorFetchStmt, inputFetchStmt, 2); + // This looks confusing but our job has 2 connectors, each connector has two job forms + // To define the job, we need to TO job form of the TO connector + // and the FROM job form of the FROM connector + List<MForm> fromConnectorFromJobForms = new ArrayList<MForm>(); + List<MForm> fromConnectorToJobForms = new ArrayList<MForm>(); + List<MForm> toConnectorFromJobForms = new ArrayList<MForm>(); + List<MForm> toConnectorToJobForms = new ArrayList<MForm>(); + + + loadConnectorForms(fromConnectorConnForms, + fromConnectorFromJobForms, + fromConnectorToJobForms, + fromFormConnectorFetchStmt, + inputFetchStmt, + 2); + loadConnectorForms(toConnectorConnForms, + toConnectorFromJobForms, + toConnectorToJobForms, + toFormConnectorFetchStmt, inputFetchStmt, 2); + loadForms(frameworkConnForms, frameworkJobForms, formFrameworkFetchStmt, inputFetchStmt, 2); MJob job = new MJob( fromConnectorId, toConnectorId, fromConnectionId, toConnectionId, - new MJobForms(fromJobForms), - new MJobForms(toJobForms), + new MJobForms(fromConnectorFromJobForms), + new MJobForms(toConnectorToJobForms), new MJobForms(frameworkJobForms)); job.setPersistenceId(id); @@ -1771,7 +1787,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler { } } finally { closeResultSets(rsJob); - closeStatements(formConnectorFetchStmt, formFrameworkFetchStmt, inputFetchStmt); + closeStatements(fromFormConnectorFetchStmt, toFormConnectorFetchStmt, formFrameworkFetchStmt, inputFetchStmt); } return jobs; http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/server/pom.xml ---------------------------------------------------------------------- diff --git a/server/pom.xml b/server/pom.xml index dc89409..67baaa5 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -64,6 +64,11 @@ limitations under the License. <artifactId>sqoop-connector-generic-jdbc</artifactId> </dependency> + <dependency> + <groupId>org.apache.sqoop.connector</groupId> + <artifactId>sqoop-connector-hdfs</artifactId> + </dependency> + <!-- <dependency> <groupId>org.apache.sqoop.connector</groupId> http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java b/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java index fba5b1c..ff99f98 100644 --- a/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java +++ b/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java @@ -283,6 +283,7 @@ public class JobRequestHandler implements RequestHandler { MJob job = repository.findJob(jid); // @TODO(Abe): From/To + long connectorId = job.getConnectorId(Direction.FROM); bean = new JobBean(job); http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/test/pom.xml ---------------------------------------------------------------------- diff --git a/test/pom.xml b/test/pom.xml index 8001fce..7a80710 100644 --- a/test/pom.xml +++ b/test/pom.xml @@ -68,6 +68,11 @@ limitations under the License. </dependency> <dependency> + <groupId>org.apache.sqoop.connector</groupId> + <artifactId>sqoop-connector-hdfs</artifactId> + </dependency> + + <dependency> <groupId>org.codehaus.cargo</groupId> <artifactId>cargo-core-container-tomcat</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/test/src/main/java/org/apache/sqoop/test/testcases/ConnectorTestCase.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/sqoop/test/testcases/ConnectorTestCase.java b/test/src/main/java/org/apache/sqoop/test/testcases/ConnectorTestCase.java index 5ec4fa4..af0f299 100644 --- a/test/src/main/java/org/apache/sqoop/test/testcases/ConnectorTestCase.java +++ b/test/src/main/java/org/apache/sqoop/test/testcases/ConnectorTestCase.java @@ -21,8 +21,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.mapred.JobConf; import org.apache.log4j.Logger; import org.apache.sqoop.client.SubmissionCallback; -import org.apache.sqoop.framework.configuration.OutputFormat; -import org.apache.sqoop.framework.configuration.StorageType; +import org.apache.sqoop.connector.hdfs.configuration.OutputFormat; +import org.apache.sqoop.connector.hdfs.configuration.StorageType; import org.apache.sqoop.model.MConnection; import org.apache.sqoop.model.MFormList; import org.apache.sqoop.model.MJob; http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableImportTest.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableImportTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableImportTest.java index 9e6f991..9171b8e 100644 --- a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableImportTest.java +++ b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableImportTest.java @@ -17,15 +17,7 @@ */ package org.apache.sqoop.integration.connector.jdbc.generic; -import org.apache.log4j.Logger; -import org.apache.sqoop.framework.configuration.OutputFormat; -import org.apache.sqoop.framework.configuration.StorageType; import org.apache.sqoop.test.testcases.ConnectorTestCase; -import org.apache.sqoop.model.MConnection; -import org.apache.sqoop.model.MFormList; -import org.apache.sqoop.model.MJob; -import org.apache.sqoop.model.MSubmission; -import org.junit.Test; import static org.junit.Assert.assertTrue; http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/imports/PartitionerTest.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/imports/PartitionerTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/imports/PartitionerTest.java index 1bc3b93..a0a4022 100644 --- a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/imports/PartitionerTest.java +++ b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/imports/PartitionerTest.java @@ -17,15 +17,7 @@ */ package org.apache.sqoop.integration.connector.jdbc.generic.imports; -import org.apache.log4j.Logger; -import org.apache.sqoop.framework.configuration.OutputFormat; -import org.apache.sqoop.framework.configuration.StorageType; -import org.apache.sqoop.model.MConnection; -import org.apache.sqoop.model.MFormList; -import org.apache.sqoop.model.MJob; import org.apache.sqoop.test.testcases.ConnectorTestCase; -import org.apache.sqoop.test.utils.ParametrizedUtils; -import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/test/src/test/java/org/apache/sqoop/integration/server/SubmissionWithDisabledModelObjectsTest.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/sqoop/integration/server/SubmissionWithDisabledModelObjectsTest.java b/test/src/test/java/org/apache/sqoop/integration/server/SubmissionWithDisabledModelObjectsTest.java index 126ca32..1a7a3a8 100644 --- a/test/src/test/java/org/apache/sqoop/integration/server/SubmissionWithDisabledModelObjectsTest.java +++ b/test/src/test/java/org/apache/sqoop/integration/server/SubmissionWithDisabledModelObjectsTest.java @@ -17,21 +17,10 @@ */ package org.apache.sqoop.integration.server; -import org.apache.sqoop.client.ClientError; -import org.apache.sqoop.common.SqoopException; -import org.apache.sqoop.framework.FrameworkError; -import org.apache.sqoop.framework.configuration.OutputFormat; -import org.apache.sqoop.framework.configuration.StorageType; -import org.apache.sqoop.model.MConnection; -import org.apache.sqoop.model.MFormList; -import org.apache.sqoop.model.MJob; import org.apache.sqoop.test.testcases.ConnectorTestCase; -import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import java.util.Arrays; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue;
