http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java index b7079dd..8061c78 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java @@ -30,10 +30,12 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Text; import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.BZip2Codec; import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.mapreduce.Job; +import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; import org.apache.sqoop.job.etl.HdfsExportExtractor; import org.apache.sqoop.job.etl.HdfsExportPartitioner; import org.apache.sqoop.job.etl.HdfsSequenceImportLoader; @@ -45,6 +47,9 @@ import org.apache.sqoop.job.io.Data; import org.apache.sqoop.job.mr.ConfigurationUtils; import org.apache.sqoop.job.mr.SqoopFileOutputFormat; import org.apache.sqoop.model.MJob; +import org.apache.sqoop.schema.Schema; +import org.apache.sqoop.schema.type.FixedPoint; +import org.apache.sqoop.schema.type.FloatingPoint; import org.junit.Test; public class TestHdfsExtract extends TestCase { @@ -53,12 +58,22 @@ public class TestHdfsExtract extends TestCase { private static final int NUMBER_OF_FILES = 5; private static final int NUMBER_OF_ROWS_PER_FILE = 1000; - private String indir; + private final String indir; public TestHdfsExtract() { indir = INPUT_ROOT + getClass().getSimpleName(); } + @Override + public void setUp() throws IOException { + FileUtils.mkdirs(indir); + } + + @Override + public void tearDown() throws IOException { + FileUtils.delete(indir); + } + /** * Test case for validating the number of partitions creation * based on input. @@ -68,12 +83,12 @@ public class TestHdfsExtract extends TestCase { */ @Test public void testHdfsExportPartitioner() throws Exception { - FileUtils.delete(indir); - FileUtils.mkdirs(indir); createTextInput(null); Configuration conf = new Configuration(); conf.set(JobConstants.HADOOP_INPUTDIR, indir); + conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT, + CSVIntermediateDataFormat.class.getName()); HdfsExportPartitioner partitioner = new HdfsExportPartitioner(); PrefixContext prefixContext = new PrefixContext(conf, ""); int[] partitionValues = {2, 3, 4, 5, 7, 8, 9, 10, 11, 12, 13, 17}; @@ -87,87 +102,67 @@ public class TestHdfsExtract extends TestCase { @Test public void testUncompressedText() throws Exception { - FileUtils.delete(indir); - FileUtils.mkdirs(indir); createTextInput(null); - Configuration conf = new Configuration(); - ConfigurationUtils.setJobType(conf, MJob.Type.EXPORT); - conf.set(JobConstants.JOB_ETL_PARTITIONER, - HdfsExportPartitioner.class.getName()); - conf.set(JobConstants.JOB_ETL_EXTRACTOR, - HdfsExportExtractor.class.getName()); - conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName()); - conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4"); - conf.set(JobConstants.HADOOP_INPUTDIR, indir); - JobUtils.runJob(conf); + JobUtils.runJob(createJob(createConf(), createSchema()).getConfiguration()); } @Test - public void testCompressedText() throws Exception { - FileUtils.delete(indir); - FileUtils.mkdirs(indir); + public void testDefaultCompressedText() throws Exception { createTextInput(SqoopFileOutputFormat.DEFAULT_CODEC); - Configuration conf = new Configuration(); - ConfigurationUtils.setJobType(conf, MJob.Type.EXPORT); - conf.set(JobConstants.JOB_ETL_PARTITIONER, - HdfsExportPartitioner.class.getName()); - conf.set(JobConstants.JOB_ETL_EXTRACTOR, - HdfsExportExtractor.class.getName()); - conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName()); - conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4"); - conf.set(JobConstants.HADOOP_INPUTDIR, indir); - JobUtils.runJob(conf); + JobUtils.runJob(createJob(createConf(), createSchema()).getConfiguration()); + } - FileUtils.delete(indir); - FileUtils.mkdirs(indir); + @Test + public void testBZip2CompressedText() throws Exception { createTextInput(BZip2Codec.class); - conf.set(JobConstants.JOB_ETL_PARTITIONER, - HdfsExportPartitioner.class.getName()); - conf.set(JobConstants.JOB_ETL_EXTRACTOR, - HdfsExportExtractor.class.getName()); - conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName()); - conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4"); - conf.set(JobConstants.HADOOP_INPUTDIR, indir); - JobUtils.runJob(conf); + JobUtils.runJob(createJob(createConf(), createSchema()).getConfiguration()); } @Test - public void testCompressedSequence() throws Exception { - FileUtils.delete(indir); - FileUtils.mkdirs(indir); + public void testDefaultCompressedSequence() throws Exception { createSequenceInput(SqoopFileOutputFormat.DEFAULT_CODEC); - Configuration conf = new Configuration(); - ConfigurationUtils.setJobType(conf, MJob.Type.EXPORT); - conf.set(JobConstants.JOB_ETL_PARTITIONER, - HdfsExportPartitioner.class.getName()); - conf.set(JobConstants.JOB_ETL_EXTRACTOR, - HdfsExportExtractor.class.getName()); - conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName()); - conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4"); - conf.set(JobConstants.HADOOP_INPUTDIR, indir); - JobUtils.runJob(conf); + JobUtils.runJob(createJob(createConf(), createSchema()).getConfiguration()); } @Test public void testUncompressedSequence() throws Exception { - FileUtils.delete(indir); - FileUtils.mkdirs(indir); createSequenceInput(null); + JobUtils.runJob(createJob(createConf(), createSchema()).getConfiguration()); + } + + private Schema createSchema() { + Schema schema = new Schema("Test"); + schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2")) + .addColumn(new org.apache.sqoop.schema.type.Text("3")); + return schema; + } + + private Configuration createConf() { Configuration conf = new Configuration(); ConfigurationUtils.setJobType(conf, MJob.Type.EXPORT); - conf.set(JobConstants.JOB_ETL_PARTITIONER, + conf.setIfUnset(JobConstants.JOB_ETL_PARTITIONER, HdfsExportPartitioner.class.getName()); - conf.set(JobConstants.JOB_ETL_EXTRACTOR, + conf.setIfUnset(JobConstants.JOB_ETL_EXTRACTOR, HdfsExportExtractor.class.getName()); - conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName()); - conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4"); - conf.set(JobConstants.HADOOP_INPUTDIR, indir); - JobUtils.runJob(conf); + conf.setIfUnset(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName()); + conf.setIfUnset(Constants.JOB_ETL_NUMBER_PARTITIONS, "4"); + conf.setIfUnset(JobConstants.INTERMEDIATE_DATA_FORMAT, + CSVIntermediateDataFormat.class.getName()); + conf.setIfUnset(JobConstants.HADOOP_INPUTDIR, indir); + return conf; + } + + private Job createJob(Configuration conf, Schema schema) throws Exception { + Job job = new Job(conf); + ConfigurationUtils.setConnectorSchema(job, schema); + job.getConfiguration().set(JobConstants.INTERMEDIATE_DATA_FORMAT, + CSVIntermediateDataFormat.class.getName()); + return job; } private void createTextInput(Class<? extends CompressionCodec> clz) @@ -227,11 +222,11 @@ public class TestHdfsExtract extends TestCase { SequenceFile.Writer filewriter; if (codec != null) { filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf), - conf, filepath, Text.class, NullWritable.class, - CompressionType.BLOCK, codec); + conf, filepath, Text.class, NullWritable.class, + CompressionType.BLOCK, codec); } else { filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf), - conf, filepath, Text.class, NullWritable.class, CompressionType.NONE); + conf, filepath, Text.class, NullWritable.class, CompressionType.NONE); } Text text = new Text();
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java index f849aae..721bba6 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java @@ -26,6 +26,7 @@ import java.io.InputStreamReader; import java.util.LinkedList; import java.util.List; +import com.google.common.base.Charsets; import junit.framework.TestCase; import org.apache.hadoop.conf.Configuration; @@ -33,7 +34,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; import org.apache.sqoop.job.etl.Extractor; import org.apache.sqoop.job.etl.ExtractorContext; import org.apache.sqoop.job.etl.HdfsSequenceImportLoader; @@ -45,6 +48,9 @@ import org.apache.sqoop.job.io.Data; import org.apache.sqoop.job.mr.ConfigurationUtils; import org.apache.sqoop.job.mr.SqoopFileOutputFormat; import org.apache.sqoop.model.MJob; +import org.apache.sqoop.schema.Schema; +import org.apache.sqoop.schema.type.FixedPoint; +import org.apache.sqoop.schema.type.FloatingPoint; public class TestHdfsLoad extends TestCase { @@ -68,13 +74,21 @@ public class TestHdfsLoad extends TestCase { conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); conf.set(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName()); + conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT, + CSVIntermediateDataFormat.class.getName()); conf.set(JobConstants.HADOOP_OUTDIR, outdir); - JobUtils.runJob(conf); + Schema schema = new Schema("Test"); + schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2")) + .addColumn(new org.apache.sqoop.schema.type.Text("3")); + + Job job = new Job(conf); + ConfigurationUtils.setConnectorSchema(job, schema); + JobUtils.runJob(job.getConfiguration()); String fileName = outdir + "/" + OUTPUT_FILE; InputStream filestream = FileUtils.open(fileName); BufferedReader filereader = new BufferedReader(new InputStreamReader( - filestream, Data.CHARSET_NAME)); + filestream, Charsets.UTF_8)); verifyOutputText(filereader); } @@ -86,9 +100,18 @@ public class TestHdfsLoad extends TestCase { conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); conf.set(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName()); + conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT, + CSVIntermediateDataFormat.class.getName()); conf.set(JobConstants.HADOOP_OUTDIR, outdir); conf.setBoolean(JobConstants.HADOOP_COMPRESS, true); - JobUtils.runJob(conf); + + Schema schema = new Schema("Test"); + schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2")) + .addColumn(new org.apache.sqoop.schema.type.Text("3")); + + Job job = new Job(conf); + ConfigurationUtils.setConnectorSchema(job, schema); + JobUtils.runJob(job.getConfiguration()); Class<? extends CompressionCodec> codecClass = conf.getClass( JobConstants.HADOOP_COMPRESS_CODEC, SqoopFileOutputFormat.DEFAULT_CODEC) @@ -97,7 +120,7 @@ public class TestHdfsLoad extends TestCase { String fileName = outdir + "/" + OUTPUT_FILE + codec.getDefaultExtension(); InputStream filestream = codec.createInputStream(FileUtils.open(fileName)); BufferedReader filereader = new BufferedReader(new InputStreamReader( - filestream, Data.CHARSET_NAME)); + filestream, Charsets.UTF_8)); verifyOutputText(filereader); } @@ -108,7 +131,7 @@ public class TestHdfsLoad extends TestCase { int index = START_ID*NUMBER_OF_ROWS_PER_ID; while ((actual = reader.readLine()) != null){ data.setContent(new Object[] { - index, (double) index, String.valueOf(index) }, + index, (double) index, new String(new byte[] {(byte)(index + 127)}, Charsets.ISO_8859_1) }, Data.ARRAY_RECORD); expected = data.toString(); index++; @@ -129,8 +152,17 @@ public class TestHdfsLoad extends TestCase { conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); conf.set(JobConstants.JOB_ETL_LOADER, HdfsSequenceImportLoader.class.getName()); + conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT, + CSVIntermediateDataFormat.class.getName()); conf.set(JobConstants.HADOOP_OUTDIR, outdir); - JobUtils.runJob(conf); + + Schema schema = new Schema("Test"); + schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2")) + .addColumn(new org.apache.sqoop.schema.type.Text("3")); + + Job job = new Job(conf); + ConfigurationUtils.setConnectorSchema(job, schema); + JobUtils.runJob(job.getConfiguration()); Path filepath = new Path(outdir, OUTPUT_FILE + HdfsSequenceImportLoader.EXTENSION); @@ -147,10 +179,18 @@ public class TestHdfsLoad extends TestCase { conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); conf.set(JobConstants.JOB_ETL_LOADER, HdfsSequenceImportLoader.class.getName()); + conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT, + CSVIntermediateDataFormat.class.getName()); conf.set(JobConstants.HADOOP_OUTDIR, outdir); conf.setBoolean(JobConstants.HADOOP_COMPRESS, true); - JobUtils.runJob(conf); + Schema schema = new Schema("Test"); + schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2")) + .addColumn(new org.apache.sqoop.schema.type.Text("3")); + + Job job = new Job(conf); + ConfigurationUtils.setConnectorSchema(job, schema); + JobUtils.runJob(job.getConfiguration()); Path filepath = new Path(outdir, OUTPUT_FILE + HdfsSequenceImportLoader.EXTENSION); SequenceFile.Reader filereader = new SequenceFile.Reader(filepath.getFileSystem(conf), filepath, conf); @@ -164,7 +204,7 @@ public class TestHdfsLoad extends TestCase { Data data = new Data(); while (reader.next(actual)){ data.setContent(new Object[] { - index, (double) index, String.valueOf(index) }, + index, (double) index, new String(new byte[] {(byte)(index + 127)}, Charsets.ISO_8859_1) }, Data.ARRAY_RECORD); expected.set(data.toString()); index++; @@ -225,7 +265,7 @@ public class TestHdfsLoad extends TestCase { Object[] array = new Object[] { id * NUMBER_OF_ROWS_PER_ID + row, (double) (id * NUMBER_OF_ROWS_PER_ID + row), - String.valueOf(id*NUMBER_OF_ROWS_PER_ID+row) + new String(new byte[]{(byte)(id * NUMBER_OF_ROWS_PER_ID + row + 127)}, Charsets.ISO_8859_1) }; context.getDataWriter().writeArrayRecord(array); } http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java index 7b264c6..ba16b3c 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java @@ -34,6 +34,7 @@ import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; import org.apache.sqoop.job.etl.Extractor; import org.apache.sqoop.job.etl.ExtractorContext; import org.apache.sqoop.job.etl.Loader; @@ -42,12 +43,17 @@ import org.apache.sqoop.job.etl.Partition; import org.apache.sqoop.job.etl.Partitioner; import org.apache.sqoop.job.etl.PartitionerContext; import org.apache.sqoop.job.io.Data; +import org.apache.sqoop.job.io.SqoopWritable; import org.apache.sqoop.job.mr.ConfigurationUtils; import org.apache.sqoop.job.mr.SqoopInputFormat; import org.apache.sqoop.job.mr.SqoopMapper; import org.apache.sqoop.job.mr.SqoopNullOutputFormat; import org.apache.sqoop.job.mr.SqoopSplit; import org.apache.sqoop.model.MJob; +import org.apache.sqoop.schema.Schema; +import org.apache.sqoop.schema.type.FixedPoint; +import org.apache.sqoop.schema.type.FloatingPoint; +import org.apache.sqoop.schema.type.Text; public class TestMapReduce extends TestCase { @@ -59,6 +65,8 @@ public class TestMapReduce extends TestCase { Configuration conf = new Configuration(); ConfigurationUtils.setJobType(conf, MJob.Type.IMPORT); conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); + conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT, + CSVIntermediateDataFormat.class.getName()); Job job = new Job(conf); SqoopInputFormat inputformat = new SqoopInputFormat(); @@ -77,8 +85,15 @@ public class TestMapReduce extends TestCase { ConfigurationUtils.setJobType(conf, MJob.Type.IMPORT); conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); + conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT, + CSVIntermediateDataFormat.class.getName()); + Schema schema = new Schema("Test"); + schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2")) + .addColumn(new org.apache.sqoop.schema.type.Text("3")); - JobUtils.runJob(conf, SqoopInputFormat.class, SqoopMapper.class, + Job job = new Job(conf); + ConfigurationUtils.setConnectorSchema(job, schema); + JobUtils.runJob(job.getConfiguration(), SqoopInputFormat.class, SqoopMapper.class, DummyOutputFormat.class); } @@ -88,8 +103,15 @@ public class TestMapReduce extends TestCase { conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName()); + conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT, + CSVIntermediateDataFormat.class.getName()); + Schema schema = new Schema("Test"); + schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2")) + .addColumn(new Text("3")); - JobUtils.runJob(conf, SqoopInputFormat.class, SqoopMapper.class, + Job job = new Job(conf); + ConfigurationUtils.setConnectorSchema(job, schema); + JobUtils.runJob(job.getConfiguration(), SqoopInputFormat.class, SqoopMapper.class, SqoopNullOutputFormat.class); } @@ -152,14 +174,14 @@ public class TestMapReduce extends TestCase { } public static class DummyOutputFormat - extends OutputFormat<Data, NullWritable> { + extends OutputFormat<SqoopWritable, NullWritable> { @Override public void checkOutputSpecs(JobContext context) { // do nothing } @Override - public RecordWriter<Data, NullWritable> getRecordWriter( + public RecordWriter<SqoopWritable, NullWritable> getRecordWriter( TaskAttemptContext context) { return new DummyRecordWriter(); } @@ -170,12 +192,13 @@ public class TestMapReduce extends TestCase { } public static class DummyRecordWriter - extends RecordWriter<Data, NullWritable> { + extends RecordWriter<SqoopWritable, NullWritable> { private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION; private Data data = new Data(); @Override - public void write(Data key, NullWritable value) { + public void write(SqoopWritable key, NullWritable value) { + data.setContent(new Object[] { index, (double) index, @@ -215,22 +238,22 @@ public class TestMapReduce extends TestCase { public static class DummyLoader extends Loader { private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION; private Data expected = new Data(); - private Data actual = new Data(); + private CSVIntermediateDataFormat actual = new CSVIntermediateDataFormat(); @Override public void load(LoaderContext context, Object oc, Object oj) throws Exception{ - Object[] array; - while ((array = context.getDataReader().readArrayRecord()) != null) { - actual.setContent(array, Data.ARRAY_RECORD); + String data; + while ((data = context.getDataReader().readTextRecord()) != null) { +// actual.setSchema(context.getSchema()); +// actual.setObjectData(array, false); expected.setContent(new Object[] { index, (double) index, String.valueOf(index)}, Data.ARRAY_RECORD); index++; - - assertEquals(expected.toString(), actual.toString()); + assertEquals(expected.toString(), data); } } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/SqoopWritableTest.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/SqoopWritableTest.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/SqoopWritableTest.java new file mode 100644 index 0000000..b78b140 --- /dev/null +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/SqoopWritableTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sqoop.job.io; + +import com.google.common.base.Charsets; +import junit.framework.Assert; +import junit.framework.TestCase; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; + +import org.apache.hadoop.conf.Configuration; +import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; +import org.apache.sqoop.job.JobConstants; + +public class SqoopWritableTest extends TestCase { + + private final SqoopWritable writable = new SqoopWritable(); + + public void testStringInStringOut() { + String testData = "Live Long and prosper"; + writable.setString(testData); + Assert.assertEquals(testData,writable.getString()); + } + + public void testDataWritten() throws IOException { + String testData = "One ring to rule them all"; + byte[] testDataBytes = testData.getBytes(Charsets.UTF_8); + writable.setString(testData); + ByteArrayOutputStream ostream = new ByteArrayOutputStream(); + DataOutput out = new DataOutputStream(ostream); + writable.write(out); + byte[] written = ostream.toByteArray(); + InputStream instream = new ByteArrayInputStream(written); + DataInput in = new DataInputStream(instream); + String readData = in.readUTF(); + Assert.assertEquals(testData, readData); + } + + public void testDataRead() throws IOException { + String testData = "Brandywine Bridge - 20 miles!"; + ByteArrayOutputStream ostream = new ByteArrayOutputStream(); + DataOutput out = new DataOutputStream(ostream); + out.writeUTF(testData); + InputStream instream = new ByteArrayInputStream(ostream.toByteArray()); + DataInput in = new DataInputStream(instream); + writable.readFields(in); + Assert.assertEquals(testData, writable.getString()); + } + + public void testWriteReadUsingStream() throws IOException { + String testData = "You shall not pass"; + ByteArrayOutputStream ostream = new ByteArrayOutputStream(); + DataOutput out = new DataOutputStream(ostream); + writable.setString(testData); + writable.write(out); + byte[] written = ostream.toByteArray(); + + //Don't test what the data is, test that SqoopWritable can read it. + InputStream instream = new ByteArrayInputStream(written); + SqoopWritable newWritable = new SqoopWritable(); + DataInput in = new DataInputStream(instream); + newWritable.readFields(in); + Assert.assertEquals(testData, newWritable.getString()); + ostream.close(); + instream.close(); + } + +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java index bee8ab7..1f55f1b 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java @@ -23,11 +23,13 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; +import org.apache.sqoop.connector.idf.IntermediateDataFormat; import org.apache.sqoop.job.JobConstants; import org.apache.sqoop.job.etl.Loader; import org.apache.sqoop.job.etl.LoaderContext; -import org.apache.sqoop.job.io.Data; import org.apache.sqoop.model.MJob; +import org.apache.sqoop.job.io.SqoopWritable; import org.junit.Before; import org.junit.Test; @@ -47,7 +49,7 @@ public class TestSqoopOutputFormatLoadExecutor { @Override public void load(LoaderContext context, Object cc, Object jc) throws Exception { - context.getDataReader().readContent(Data.CSV_RECORD); + context.getDataReader().readTextRecord(); throw new BrokenBarrierException(); } } @@ -62,7 +64,7 @@ public class TestSqoopOutputFormatLoadExecutor { int runCount = 0; Object o; String[] arr; - while ((o = context.getDataReader().readContent(Data.CSV_RECORD)) != null) { + while ((o = context.getDataReader().readTextRecord()) != null) { arr = o.toString().split(","); Assert.assertEquals(100, arr.length); for (int i = 0; i < arr.length; i++) { @@ -84,7 +86,7 @@ public class TestSqoopOutputFormatLoadExecutor { @Override public void load(LoaderContext context, Object cc, Object jc) throws Exception { - String[] arr = context.getDataReader().readContent(Data.CSV_RECORD).toString().split(","); + String[] arr = context.getDataReader().readTextRecord().toString().split(","); Assert.assertEquals(100, arr.length); for (int i = 0; i < arr.length; i++) { Assert.assertEquals(i, Integer.parseInt(arr[i])); @@ -103,7 +105,7 @@ public class TestSqoopOutputFormatLoadExecutor { int runCount = 0; Object o; String[] arr; - while ((o = context.getDataReader().readContent(Data.CSV_RECORD)) != null) { + while ((o = context.getDataReader().readTextRecord()) != null) { arr = o.toString().split(","); Assert.assertEquals(100, arr.length); for (int i = 0; i < arr.length; i++) { @@ -119,6 +121,7 @@ public class TestSqoopOutputFormatLoadExecutor { @Before public void setUp() { conf = new Configuration(); + conf.setIfUnset(JobConstants.INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName()); } @@ -128,12 +131,14 @@ public class TestSqoopOutputFormatLoadExecutor { conf.set(JobConstants.JOB_ETL_LOADER, ThrowingLoader.class.getName()); SqoopOutputFormatLoadExecutor executor = new SqoopOutputFormatLoadExecutor(true, ThrowingLoader.class.getName()); - RecordWriter<Data, NullWritable> writer = executor.getRecordWriter(); - Data data = new Data(); + RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter(); + IntermediateDataFormat data = new CSVIntermediateDataFormat(); + SqoopWritable writable = new SqoopWritable(); try { for (int count = 0; count < 100; count++) { - data.setContent(String.valueOf(count), Data.CSV_RECORD); - writer.write(data, null); + data.setTextData(String.valueOf(count)); + writable.setString(data.getTextData()); + writer.write(writable, null); } } catch (SqoopException ex) { throw ex.getCause(); @@ -146,8 +151,9 @@ public class TestSqoopOutputFormatLoadExecutor { conf.set(JobConstants.JOB_ETL_LOADER, GoodContinuousLoader.class.getName()); SqoopOutputFormatLoadExecutor executor = new SqoopOutputFormatLoadExecutor(true, GoodContinuousLoader.class.getName()); - RecordWriter<Data, NullWritable> writer = executor.getRecordWriter(); - Data data = new Data(); + RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter(); + IntermediateDataFormat data = new CSVIntermediateDataFormat(); + SqoopWritable writable = new SqoopWritable(); for (int i = 0; i < 10; i++) { StringBuilder builder = new StringBuilder(); for (int count = 0; count < 100; count++) { @@ -156,8 +162,9 @@ public class TestSqoopOutputFormatLoadExecutor { builder.append(","); } } - data.setContent(builder.toString(), Data.CSV_RECORD); - writer.write(data, null); + data.setTextData(builder.toString()); + writable.setString(data.getTextData()); + writer.write(writable, null); } writer.close(null); } @@ -166,8 +173,9 @@ public class TestSqoopOutputFormatLoadExecutor { public void testSuccessfulLoader() throws Throwable { SqoopOutputFormatLoadExecutor executor = new SqoopOutputFormatLoadExecutor(true, GoodLoader.class.getName()); - RecordWriter<Data, NullWritable> writer = executor.getRecordWriter(); - Data data = new Data(); + RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter(); + IntermediateDataFormat data = new CSVIntermediateDataFormat(); + SqoopWritable writable = new SqoopWritable(); StringBuilder builder = new StringBuilder(); for (int count = 0; count < 100; count++) { builder.append(String.valueOf(count)); @@ -175,8 +183,10 @@ public class TestSqoopOutputFormatLoadExecutor { builder.append(","); } } - data.setContent(builder.toString(), Data.CSV_RECORD); - writer.write(data, null); + data.setTextData(builder.toString()); + writable.setString(data.getTextData()); + writer.write(writable, null); + //Allow writer to complete. TimeUnit.SECONDS.sleep(5); writer.close(null); @@ -189,8 +199,9 @@ public class TestSqoopOutputFormatLoadExecutor { conf.set(JobConstants.JOB_ETL_LOADER, ThrowingContinuousLoader.class.getName()); SqoopOutputFormatLoadExecutor executor = new SqoopOutputFormatLoadExecutor(true, ThrowingContinuousLoader.class.getName()); - RecordWriter<Data, NullWritable> writer = executor.getRecordWriter(); - Data data = new Data(); + RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter(); + IntermediateDataFormat data = new CSVIntermediateDataFormat(); + SqoopWritable writable = new SqoopWritable(); try { for (int i = 0; i < 10; i++) { StringBuilder builder = new StringBuilder(); @@ -200,8 +211,9 @@ public class TestSqoopOutputFormatLoadExecutor { builder.append(","); } } - data.setContent(builder.toString(), Data.CSV_RECORD); - writer.write(data, null); + data.setTextData(builder.toString()); + writable.setString(data.getTextData()); + writer.write(writable, null); } writer.close(null); } catch (SqoopException ex) { http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 1e2f005..a722c74 100644 --- a/pom.xml +++ b/pom.xml @@ -143,12 +143,6 @@ limitations under the License. </dependency> <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - <version>${guava.version}</version> - </dependency> - - <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-core</artifactId> <version>${hadoop.1.version}</version> @@ -345,6 +339,11 @@ limitations under the License. <version>${commons-lang.version}</version> </dependency> <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>${guava.version}</version> + </dependency> + <dependency> <groupId>javax.servlet</groupId> <artifactId>servlet-api</artifactId> <version>${servlet.version}</version> http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/spi/pom.xml ---------------------------------------------------------------------- diff --git a/spi/pom.xml b/spi/pom.xml index 0b240e8..43f17d4 100644 --- a/spi/pom.xml +++ b/spi/pom.xml @@ -36,5 +36,10 @@ limitations under the License. <groupId>org.apache.sqoop</groupId> <artifactId>sqoop-common</artifactId> </dependency> + + <dependency> + <groupId>org.apache.sqoop</groupId> + <artifactId>connector-sdk</artifactId> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java ---------------------------------------------------------------------- diff --git a/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java b/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java index 2becc56..50eb940 100644 --- a/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java +++ b/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java @@ -20,6 +20,8 @@ package org.apache.sqoop.connector.spi; import java.util.Locale; import java.util.ResourceBundle; +import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; +import org.apache.sqoop.connector.idf.IntermediateDataFormat; import org.apache.sqoop.job.etl.Exporter; import org.apache.sqoop.job.etl.Importer; import org.apache.sqoop.model.MJob; @@ -79,4 +81,14 @@ public abstract class SqoopConnector { */ public abstract MetadataUpgrader getMetadataUpgrader(); + /** + * Returns the {@linkplain IntermediateDataFormat} this connector + * can return natively in. This will support retrieving the data as text + * and an array of objects. This should never return null. + * + * @return {@linkplain IntermediateDataFormat} object + */ + public Class<? extends IntermediateDataFormat<?>> getIntermediateDataFormat() { + return CSVIntermediateDataFormat.class; + } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java ---------------------------------------------------------------------- diff --git a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java index bfc28ef..a05274a 100644 --- a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java +++ b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java @@ -198,6 +198,7 @@ public class MapreduceSubmissionEngine extends SubmissionEngine { ConfigurationUtils.setConfigConnectorJob(job, request.getConfigConnectorJob()); ConfigurationUtils.setConfigFrameworkConnection(job, request.getConfigFrameworkConnection()); ConfigurationUtils.setConfigFrameworkJob(job, request.getConfigFrameworkJob()); + ConfigurationUtils.setConnectorSchema(job, request.getSummary().getConnectorSchema()); if(request.getJobName() != null) { job.setJobName("Sqoop: " + request.getJobName());
