Repository: sqoop Updated Branches: refs/heads/SQOOP-1367 3bb7ff834 -> 2b214cdd2
SQOOP-1455: Sqoop2: From/To: Re-enable MapreduceExecutionEngine tests (Abraham Elmahrek via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/2b214cdd Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/2b214cdd Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/2b214cdd Branch: refs/heads/SQOOP-1367 Commit: 2b214cdd27b9634045a3d1608a5b2b0b0f974293 Parents: 3bb7ff8 Author: Jarek Jarcec Cecho <[email protected]> Authored: Tue Aug 19 19:11:13 2014 -0700 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Tue Aug 19 19:11:13 2014 -0700 ---------------------------------------------------------------------- .../org/apache/sqoop/job/TestMapReduce.java | 398 +++++++++---------- .../java/org/apache/sqoop/job/io/TestData.java | 178 ++++----- .../sqoop/job/mr/TestConfigurationUtils.java | 272 ++++++------- .../mr/TestSqoopOutputFormatLoadExecutor.java | 364 +++++++++-------- 4 files changed, 601 insertions(+), 611 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/2b214cdd/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java index 4219e9e..2dfc487 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java @@ -34,7 +34,8 @@ import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; -//import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; +import org.apache.sqoop.common.Direction; +import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; import org.apache.sqoop.job.etl.Extractor; import org.apache.sqoop.job.etl.ExtractorContext; import org.apache.sqoop.job.etl.Loader; @@ -49,7 +50,6 @@ import org.apache.sqoop.job.mr.SqoopInputFormat; import org.apache.sqoop.job.mr.SqoopMapper; import org.apache.sqoop.job.mr.SqoopNullOutputFormat; import org.apache.sqoop.job.mr.SqoopSplit; -import org.apache.sqoop.model.MJob; import org.apache.sqoop.schema.Schema; import org.apache.sqoop.schema.type.FixedPoint; import org.apache.sqoop.schema.type.FloatingPoint; @@ -57,204 +57,198 @@ import org.apache.sqoop.schema.type.Text; public class TestMapReduce extends TestCase { -// private static final int START_PARTITION = 1; -// private static final int NUMBER_OF_PARTITIONS = 9; -// private static final int NUMBER_OF_ROWS_PER_PARTITION = 10; -// -// public void testInputFormat() throws Exception { -// Configuration conf = new Configuration(); -// ConfigurationUtils.setJobType(conf, MJob.Type.IMPORT); -// conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); -// conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT, -// CSVIntermediateDataFormat.class.getName()); -// Job job = new Job(conf); -// -// SqoopInputFormat inputformat = new SqoopInputFormat(); -// List<InputSplit> splits = inputformat.getSplits(job); -// assertEquals(9, splits.size()); -// -// for (int id = START_PARTITION; id <= NUMBER_OF_PARTITIONS; id++) { -// SqoopSplit split = (SqoopSplit)splits.get(id-1); -// DummyPartition partition = (DummyPartition)split.getPartition(); -// assertEquals(id, partition.getId()); -// } -// } -// -// public void testMapper() throws Exception { -// Configuration conf = new Configuration(); -// ConfigurationUtils.setJobType(conf, MJob.Type.IMPORT); -// conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); -// conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); -// conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT, -// CSVIntermediateDataFormat.class.getName()); -// Schema schema = new Schema("Test"); -// schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2")) -// .addColumn(new org.apache.sqoop.schema.type.Text("3")); -// -// Job job = new Job(conf); -// ConfigurationUtils.setConnectorSchema(job, schema); -// JobUtils.runJob(job.getConfiguration(), SqoopInputFormat.class, SqoopMapper.class, -// DummyOutputFormat.class); -// } -// -// public void testOutputFormat() throws Exception { -// Configuration conf = new Configuration(); -// ConfigurationUtils.setJobType(conf, MJob.Type.IMPORT); -// conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); -// conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); -// conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName()); -// conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT, -// CSVIntermediateDataFormat.class.getName()); -// Schema schema = new Schema("Test"); -// schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2")) -// .addColumn(new Text("3")); -// -// Job job = new Job(conf); -// ConfigurationUtils.setConnectorSchema(job, schema); -// JobUtils.runJob(job.getConfiguration(), SqoopInputFormat.class, SqoopMapper.class, -// SqoopNullOutputFormat.class); -// } -// -// public static class DummyPartition extends Partition { -// private int id; -// -// public void setId(int id) { -// this.id = id; -// } -// -// public int getId() { -// return id; -// } -// -// @Override -// public void readFields(DataInput in) throws IOException { -// id = in.readInt(); -// } -// -// @Override -// public void write(DataOutput out) throws IOException { -// out.writeInt(id); -// } -// -// @Override -// public String toString() { -// return Integer.toString(id); -// } -// } -// -// public static class DummyPartitioner extends Partitioner { -// @Override -// public List<Partition> getPartitions(PartitionerContext context, Object oc, Object oj) { -// List<Partition> partitions = new LinkedList<Partition>(); -// for (int id = START_PARTITION; id <= NUMBER_OF_PARTITIONS; id++) { -// DummyPartition partition = new DummyPartition(); -// partition.setId(id); -// partitions.add(partition); -// } -// return partitions; -// } -// } -// -// public static class DummyExtractor extends Extractor { -// @Override -// public void extract(ExtractorContext context, Object oc, Object oj, Object partition) { -// int id = ((DummyPartition)partition).getId(); -// for (int row = 0; row < NUMBER_OF_ROWS_PER_PARTITION; row++) { -// context.getDataWriter().writeArrayRecord(new Object[] { -// id * NUMBER_OF_ROWS_PER_PARTITION + row, -// (double) (id * NUMBER_OF_ROWS_PER_PARTITION + row), -// String.valueOf(id*NUMBER_OF_ROWS_PER_PARTITION+row)}); -// } -// } -// -// @Override -// public long getRowsRead() { -// return NUMBER_OF_ROWS_PER_PARTITION; -// } -// } -// -// public static class DummyOutputFormat -// extends OutputFormat<SqoopWritable, NullWritable> { -// @Override -// public void checkOutputSpecs(JobContext context) { -// // do nothing -// } -// -// @Override -// public RecordWriter<SqoopWritable, NullWritable> getRecordWriter( -// TaskAttemptContext context) { -// return new DummyRecordWriter(); -// } -// -// @Override -// public OutputCommitter getOutputCommitter(TaskAttemptContext context) { -// return new DummyOutputCommitter(); -// } -// -// public static class DummyRecordWriter -// extends RecordWriter<SqoopWritable, NullWritable> { -// private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION; -// private Data data = new Data(); -// -// @Override -// public void write(SqoopWritable key, NullWritable value) { -// -// data.setContent(new Object[] { -// index, -// (double) index, -// String.valueOf(index)}, -// Data.ARRAY_RECORD); -// index++; -// -// assertEquals(data.toString(), key.toString()); -// } -// -// @Override -// public void close(TaskAttemptContext context) { -// // do nothing -// } -// } -// -// public static class DummyOutputCommitter extends OutputCommitter { -// @Override -// public void setupJob(JobContext jobContext) { } -// -// @Override -// public void setupTask(TaskAttemptContext taskContext) { } -// -// @Override -// public void commitTask(TaskAttemptContext taskContext) { } -// -// @Override -// public void abortTask(TaskAttemptContext taskContext) { } -// -// @Override -// public boolean needsTaskCommit(TaskAttemptContext taskContext) { -// return false; -// } -// } -// } -// -// public static class DummyLoader extends Loader { -// private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION; -// private Data expected = new Data(); -// private CSVIntermediateDataFormat actual = new CSVIntermediateDataFormat(); -// -// @Override -// public void load(LoaderContext context, Object oc, Object oj) throws Exception{ -// String data; -// while ((data = context.getDataReader().readTextRecord()) != null) { -// -//// actual.setSchema(context.getSchema()); -//// actual.setObjectData(array, false); -// expected.setContent(new Object[] { -// index, -// (double) index, -// String.valueOf(index)}, -// Data.ARRAY_RECORD); -// index++; -// assertEquals(expected.toString(), data); -// } -// } -// } + private static final int START_PARTITION = 1; + private static final int NUMBER_OF_PARTITIONS = 9; + private static final int NUMBER_OF_ROWS_PER_PARTITION = 10; + + public void testInputFormat() throws Exception { + Configuration conf = new Configuration(); + conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); + conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT, + CSVIntermediateDataFormat.class.getName()); + Job job = new Job(conf); + + SqoopInputFormat inputformat = new SqoopInputFormat(); + List<InputSplit> splits = inputformat.getSplits(job); + assertEquals(9, splits.size()); + + for (int id = START_PARTITION; id <= NUMBER_OF_PARTITIONS; id++) { + SqoopSplit split = (SqoopSplit)splits.get(id-1); + DummyPartition partition = (DummyPartition)split.getPartition(); + assertEquals(id, partition.getId()); + } + } + + public void testMapper() throws Exception { + Configuration conf = new Configuration(); + conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); + conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); + conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT, + CSVIntermediateDataFormat.class.getName()); + Schema schema = new Schema("Test"); + schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2")) + .addColumn(new org.apache.sqoop.schema.type.Text("3")); + + Job job = new Job(conf); + ConfigurationUtils.setConnectorSchema(Direction.FROM, job, schema); + JobUtils.runJob(job.getConfiguration(), SqoopInputFormat.class, SqoopMapper.class, + DummyOutputFormat.class); + } + + public void testOutputFormat() throws Exception { + Configuration conf = new Configuration(); + conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); + conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); + conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName()); + conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT, + CSVIntermediateDataFormat.class.getName()); + Schema schema = new Schema("Test"); + schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2")) + .addColumn(new Text("3")); + + Job job = new Job(conf); + ConfigurationUtils.setConnectorSchema(Direction.FROM, job, schema); + JobUtils.runJob(job.getConfiguration(), SqoopInputFormat.class, SqoopMapper.class, + SqoopNullOutputFormat.class); + } + + public static class DummyPartition extends Partition { + private int id; + + public void setId(int id) { + this.id = id; + } + + public int getId() { + return id; + } + + @Override + public void readFields(DataInput in) throws IOException { + id = in.readInt(); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(id); + } + + @Override + public String toString() { + return Integer.toString(id); + } + } + + public static class DummyPartitioner extends Partitioner { + @Override + public List<Partition> getPartitions(PartitionerContext context, Object oc, Object oj) { + List<Partition> partitions = new LinkedList<Partition>(); + for (int id = START_PARTITION; id <= NUMBER_OF_PARTITIONS; id++) { + DummyPartition partition = new DummyPartition(); + partition.setId(id); + partitions.add(partition); + } + return partitions; + } + } + + public static class DummyExtractor extends Extractor { + @Override + public void extract(ExtractorContext context, Object oc, Object oj, Object partition) { + int id = ((DummyPartition)partition).getId(); + for (int row = 0; row < NUMBER_OF_ROWS_PER_PARTITION; row++) { + context.getDataWriter().writeArrayRecord(new Object[] { + id * NUMBER_OF_ROWS_PER_PARTITION + row, + (double) (id * NUMBER_OF_ROWS_PER_PARTITION + row), + String.valueOf(id*NUMBER_OF_ROWS_PER_PARTITION+row)}); + } + } + + @Override + public long getRowsRead() { + return NUMBER_OF_ROWS_PER_PARTITION; + } + } + + public static class DummyOutputFormat + extends OutputFormat<SqoopWritable, NullWritable> { + @Override + public void checkOutputSpecs(JobContext context) { + // do nothing + } + + @Override + public RecordWriter<SqoopWritable, NullWritable> getRecordWriter( + TaskAttemptContext context) { + return new DummyRecordWriter(); + } + + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext context) { + return new DummyOutputCommitter(); + } + + public static class DummyRecordWriter + extends RecordWriter<SqoopWritable, NullWritable> { + private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION; + private Data data = new Data(); + + @Override + public void write(SqoopWritable key, NullWritable value) { + + data.setContent(new Object[] { + index, + (double) index, + String.valueOf(index)}, + Data.ARRAY_RECORD); + index++; + + assertEquals(data.toString(), key.toString()); + } + + @Override + public void close(TaskAttemptContext context) { + // do nothing + } + } + + public static class DummyOutputCommitter extends OutputCommitter { + @Override + public void setupJob(JobContext jobContext) { } + + @Override + public void setupTask(TaskAttemptContext taskContext) { } + + @Override + public void commitTask(TaskAttemptContext taskContext) { } + + @Override + public void abortTask(TaskAttemptContext taskContext) { } + + @Override + public boolean needsTaskCommit(TaskAttemptContext taskContext) { + return false; + } + } + } + + public static class DummyLoader extends Loader { + private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION; + private Data expected = new Data(); + private CSVIntermediateDataFormat actual = new CSVIntermediateDataFormat(); + + @Override + public void load(LoaderContext context, Object oc, Object oj) throws Exception{ + String data; + while ((data = context.getDataReader().readTextRecord()) != null) { + expected.setContent(new Object[] { + index, + (double) index, + String.valueOf(index)}, + Data.ARRAY_RECORD); + index++; + assertEquals(expected.toString(), data); + } + } + } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/2b214cdd/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestData.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestData.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestData.java index 48fb61f..91df426 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestData.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestData.java @@ -25,94 +25,94 @@ import org.junit.Test; public class TestData extends TestCase { -// private static final double TEST_NUMBER = Math.PI + 100; -// @Test -// public void testArrayToCsv() throws Exception { -// Data data = new Data(); -// String expected; -// String actual; -// -// // with special characters: -// expected = -// Long.valueOf((long)TEST_NUMBER) + "," + -// Double.valueOf(TEST_NUMBER) + "," + -// "'" + String.valueOf(TEST_NUMBER) + "\\',s'" + "," + -// Arrays.toString(new byte[] {1, 2, 3, 4, 5}); -// data.setContent(new Object[] { -// Long.valueOf((long)TEST_NUMBER), -// Double.valueOf(TEST_NUMBER), -// String.valueOf(TEST_NUMBER) + "',s", -// new byte[] {1, 2, 3, 4, 5} }, -// Data.ARRAY_RECORD); -// actual = (String)data.getContent(Data.CSV_RECORD); -// assertEquals(expected, actual); -// -// // with null characters: -// expected = -// Long.valueOf((long)TEST_NUMBER) + "," + -// Double.valueOf(TEST_NUMBER) + "," + -// "null" + "," + -// Arrays.toString(new byte[] {1, 2, 3, 4, 5}); -// data.setContent(new Object[] { -// Long.valueOf((long)TEST_NUMBER), -// Double.valueOf(TEST_NUMBER), -// null, -// new byte[] {1, 2, 3, 4, 5} }, -// Data.ARRAY_RECORD); -// actual = (String)data.getContent(Data.CSV_RECORD); -// assertEquals(expected, actual); -// } -// -// @Test -// public void testCsvToArray() throws Exception { -// Data data = new Data(); -// Object[] expected; -// Object[] actual; -// -// // with special characters: -// expected = new Object[] { -// Long.valueOf((long)TEST_NUMBER), -// Double.valueOf(TEST_NUMBER), -// String.valueOf(TEST_NUMBER) + "',s", -// new byte[] {1, 2, 3, 4, 5} }; -// data.setContent( -// Long.valueOf((long)TEST_NUMBER) + "," + -// Double.valueOf(TEST_NUMBER) + "," + -// "'" + String.valueOf(TEST_NUMBER) + "\\',s'" + "," + -// Arrays.toString(new byte[] {1, 2, 3, 4, 5}), -// Data.CSV_RECORD); -// actual = (Object[])data.getContent(Data.ARRAY_RECORD); -// assertEquals(expected.length, actual.length); -// for (int c=0; c<expected.length; c++) { -// assertEquals(expected[c], actual[c]); -// } -// -// // with null characters: -// expected = new Object[] { -// Long.valueOf((long)TEST_NUMBER), -// Double.valueOf(TEST_NUMBER), -// null, -// new byte[] {1, 2, 3, 4, 5} }; -// data.setContent( -// Long.valueOf((long)TEST_NUMBER) + "," + -// Double.valueOf(TEST_NUMBER) + "," + -// "null" + "," + -// Arrays.toString(new byte[] {1, 2, 3, 4, 5}), -// Data.CSV_RECORD); -// actual = (Object[])data.getContent(Data.ARRAY_RECORD); -// assertEquals(expected.length, actual.length); -// for (int c=0; c<expected.length; c++) { -// assertEquals(expected[c], actual[c]); -// } -// } -// -// public static void assertEquals(Object expected, Object actual) { -// if (expected instanceof byte[]) { -// assertEquals(Arrays.toString((byte[])expected), -// Arrays.toString((byte[])actual)); -// } else { -// TestCase.assertEquals(expected, actual); -// } -// } + private static final double TEST_NUMBER = Math.PI + 100; + @Test + public void testArrayToCsv() throws Exception { + Data data = new Data(); + String expected; + String actual; + + // with special characters: + expected = + Long.valueOf((long)TEST_NUMBER) + "," + + Double.valueOf(TEST_NUMBER) + "," + + "'" + String.valueOf(TEST_NUMBER) + "\\',s'" + "," + + Arrays.toString(new byte[] {1, 2, 3, 4, 5}); + data.setContent(new Object[] { + Long.valueOf((long)TEST_NUMBER), + Double.valueOf(TEST_NUMBER), + String.valueOf(TEST_NUMBER) + "',s", + new byte[] {1, 2, 3, 4, 5} }, + Data.ARRAY_RECORD); + actual = (String)data.getContent(Data.CSV_RECORD); + assertEquals(expected, actual); + + // with null characters: + expected = + Long.valueOf((long)TEST_NUMBER) + "," + + Double.valueOf(TEST_NUMBER) + "," + + "null" + "," + + Arrays.toString(new byte[] {1, 2, 3, 4, 5}); + data.setContent(new Object[] { + Long.valueOf((long)TEST_NUMBER), + Double.valueOf(TEST_NUMBER), + null, + new byte[] {1, 2, 3, 4, 5} }, + Data.ARRAY_RECORD); + actual = (String)data.getContent(Data.CSV_RECORD); + assertEquals(expected, actual); + } + + @Test + public void testCsvToArray() throws Exception { + Data data = new Data(); + Object[] expected; + Object[] actual; + + // with special characters: + expected = new Object[] { + Long.valueOf((long)TEST_NUMBER), + Double.valueOf(TEST_NUMBER), + String.valueOf(TEST_NUMBER) + "',s", + new byte[] {1, 2, 3, 4, 5} }; + data.setContent( + Long.valueOf((long)TEST_NUMBER) + "," + + Double.valueOf(TEST_NUMBER) + "," + + "'" + String.valueOf(TEST_NUMBER) + "\\',s'" + "," + + Arrays.toString(new byte[] {1, 2, 3, 4, 5}), + Data.CSV_RECORD); + actual = (Object[])data.getContent(Data.ARRAY_RECORD); + assertEquals(expected.length, actual.length); + for (int c=0; c<expected.length; c++) { + assertEquals(expected[c], actual[c]); + } + + // with null characters: + expected = new Object[] { + Long.valueOf((long)TEST_NUMBER), + Double.valueOf(TEST_NUMBER), + null, + new byte[] {1, 2, 3, 4, 5} }; + data.setContent( + Long.valueOf((long)TEST_NUMBER) + "," + + Double.valueOf(TEST_NUMBER) + "," + + "null" + "," + + Arrays.toString(new byte[] {1, 2, 3, 4, 5}), + Data.CSV_RECORD); + actual = (Object[])data.getContent(Data.ARRAY_RECORD); + assertEquals(expected.length, actual.length); + for (int c=0; c<expected.length; c++) { + assertEquals(expected[c], actual[c]); + } + } + + public static void assertEquals(Object expected, Object actual) { + if (expected instanceof byte[]) { + assertEquals(Arrays.toString((byte[])expected), + Arrays.toString((byte[])actual)); + } else { + TestCase.assertEquals(expected, actual); + } + } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/2b214cdd/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestConfigurationUtils.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestConfigurationUtils.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestConfigurationUtils.java index 25e83a2..1447e00 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestConfigurationUtils.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestConfigurationUtils.java @@ -19,6 +19,7 @@ package org.apache.sqoop.job.mr; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; +import org.apache.sqoop.common.Direction; import org.apache.sqoop.model.ConfigurationClass; import org.apache.sqoop.model.Form; import org.apache.sqoop.model.FormClass; @@ -41,140 +42,139 @@ import static org.mockito.Mockito.when; */ public class TestConfigurationUtils { -// Job job; -// JobConf jobConf; -// -// @Before -// public void setUp() throws Exception { -// setUpJob(); -// setUpJobConf(); -// } -// -// public void setUpJob() throws Exception { -// job = new Job(); -// } -// -// public void setUpJobConf() throws Exception { -// jobConf = spy(new JobConf(job.getConfiguration())); -// when(jobConf.getCredentials()).thenReturn(job.getCredentials()); -// } -// -// @Test -// public void testJobType() throws Exception { -// ConfigurationUtils.setJobType(job.getConfiguration(), MJob.Type.IMPORT); -// setUpJobConf(); -// assertEquals(MJob.Type.IMPORT, ConfigurationUtils.getJobType(jobConf)); -// } -// -// @Test -// public void testConfigConnectorConnection() throws Exception { -// ConfigurationUtils.setConfigFromConnectorConnection(job, getConfig()); -// setUpJobConf(); -// assertEquals(getConfig(), ConfigurationUtils.getConfigFromConnectorConnection(jobConf)); -// } -// -// @Test -// public void testConfigConnectorJob() throws Exception { -// ConfigurationUtils.setConfigFromConnectorJob(job, getConfig()); -// setUpJobConf(); -// assertEquals(getConfig(), ConfigurationUtils.getConfigFromConnectorJob(jobConf)); -// } -// -// @Test -// public void testConfigFrameworkConnection() throws Exception { -// ConfigurationUtils.setConfigFrameworkConnection(job, getConfig()); -// setUpJobConf(); -// assertEquals(getConfig(), ConfigurationUtils.getConfigFrameworkConnection(jobConf)); -// } -// -// @Test -// public void testConfigFrameworkJob() throws Exception { -// ConfigurationUtils.setFrameworkJobConfig(job, getConfig()); -// setUpJobConf(); -// assertEquals(getConfig(), ConfigurationUtils.getFrameworkJobConfig(jobConf)); -// } -// -// @Test -// public void testConnectorSchema() throws Exception { -// ConfigurationUtils.setConnectorSchema(job, getSchema("a")); -// assertEquals(getSchema("a"), ConfigurationUtils.getFromConnectorSchema(jobConf)); -// } -// -// @Test -// public void testConnectorSchemaNull() throws Exception { -// ConfigurationUtils.setConnectorSchema(job, null); -// assertNull(ConfigurationUtils.getFromConnectorSchema(jobConf)); -// } -// -// @Test -// public void testHioSchema() throws Exception { -// ConfigurationUtils.setHioSchema(job, getSchema("a")); -// assertEquals(getSchema("a"), ConfigurationUtils.getHioSchema(jobConf)); -// } -// -// @Test -// public void testHioSchemaNull() throws Exception { -// ConfigurationUtils.setHioSchema(job, null); -// assertNull(ConfigurationUtils.getHioSchema(jobConf)); -// } -// -// private Schema getSchema(String name) { -// return new Schema(name).addColumn(new Text("c1")); -// } -// -// private Config getConfig() { -// Config c = new Config(); -// c.f.A = "This is secret text!"; -// return c; -// } -// -// @FormClass -// public static class F { -// -// @Input String A; -// -// @Override -// public boolean equals(Object o) { -// if (this == o) return true; -// if (!(o instanceof F)) return false; -// -// F f = (F) o; -// -// if (A != null ? !A.equals(f.A) : f.A != null) return false; -// -// return true; -// } -// -// @Override -// public int hashCode() { -// return A != null ? A.hashCode() : 0; -// } -// } -// -// @ConfigurationClass -// public static class Config { -// @Form F f; -// -// public Config() { -// f = new F(); -// } -// -// @Override -// public boolean equals(Object o) { -// if (this == o) return true; -// if (!(o instanceof Config)) return false; -// -// Config config = (Config) o; -// -// if (f != null ? !f.equals(config.f) : config.f != null) -// return false; -// -// return true; -// } -// -// @Override -// public int hashCode() { -// return f != null ? f.hashCode() : 0; -// } -// } + Job job; + JobConf jobConf; + + @Before + public void setUp() throws Exception { + setUpJob(); + setUpJobConf(); + } + + public void setUpJob() throws Exception { + job = new Job(); + } + + public void setUpJobConf() throws Exception { + jobConf = spy(new JobConf(job.getConfiguration())); + when(jobConf.getCredentials()).thenReturn(job.getCredentials()); + } + + @Test + public void testConfigConnectorConnection() throws Exception { + ConfigurationUtils.setConnectorConnectionConfig(Direction.FROM, job, getConfig()); + setUpJobConf(); + assertEquals(getConfig(), ConfigurationUtils.getConnectorConnectionConfig(Direction.FROM, jobConf)); + + ConfigurationUtils.setConnectorConnectionConfig(Direction.TO, job, getConfig()); + setUpJobConf(); + assertEquals(getConfig(), ConfigurationUtils.getConnectorConnectionConfig(Direction.TO, jobConf)); + } + + @Test + public void testConfigConnectorJob() throws Exception { + ConfigurationUtils.setConnectorJobConfig(Direction.FROM, job, getConfig()); + setUpJobConf(); + assertEquals(getConfig(), ConfigurationUtils.getConnectorJobConfig(Direction.FROM, jobConf)); + + ConfigurationUtils.setConnectorJobConfig(Direction.TO, job, getConfig()); + setUpJobConf(); + assertEquals(getConfig(), ConfigurationUtils.getConnectorJobConfig(Direction.TO, jobConf)); + } + + @Test + public void testConfigFrameworkConnection() throws Exception { + ConfigurationUtils.setFrameworkConnectionConfig(Direction.FROM, job, getConfig()); + setUpJobConf(); + assertEquals(getConfig(), ConfigurationUtils.getFrameworkConnectionConfig(Direction.FROM, jobConf)); + + ConfigurationUtils.setFrameworkConnectionConfig(Direction.TO, job, getConfig()); + setUpJobConf(); + assertEquals(getConfig(), ConfigurationUtils.getFrameworkConnectionConfig(Direction.TO, jobConf)); + } + + @Test + public void testConfigFrameworkJob() throws Exception { + ConfigurationUtils.setFrameworkJobConfig(job, getConfig()); + setUpJobConf(); + assertEquals(getConfig(), ConfigurationUtils.getFrameworkJobConfig(jobConf)); + } + + @Test + public void testConnectorSchema() throws Exception { + ConfigurationUtils.setConnectorSchema(Direction.FROM, job, getSchema("a")); + assertEquals(getSchema("a"), ConfigurationUtils.getConnectorSchema(Direction.FROM, jobConf)); + + ConfigurationUtils.setConnectorSchema(Direction.TO, job, getSchema("b")); + assertEquals(getSchema("b"), ConfigurationUtils.getConnectorSchema(Direction.TO, jobConf)); + } + + @Test + public void testConnectorSchemaNull() throws Exception { + ConfigurationUtils.setConnectorSchema(Direction.FROM, job, null); + assertNull(ConfigurationUtils.getConnectorSchema(Direction.FROM, jobConf)); + + ConfigurationUtils.setConnectorSchema(Direction.TO, job, null); + assertNull(ConfigurationUtils.getConnectorSchema(Direction.FROM, jobConf)); + } + + private Schema getSchema(String name) { + return new Schema(name).addColumn(new Text("c1")); + } + + private Config getConfig() { + Config c = new Config(); + c.f.A = "This is secret text!"; + return c; + } + + @FormClass + public static class F { + + @Input String A; + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof F)) return false; + + F f = (F) o; + + if (A != null ? !A.equals(f.A) : f.A != null) return false; + + return true; + } + + @Override + public int hashCode() { + return A != null ? A.hashCode() : 0; + } + } + + @ConfigurationClass + public static class Config { + @Form F f; + + public Config() { + f = new F(); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof Config)) return false; + + Config config = (Config) o; + + if (f != null ? !f.equals(config.f) : config.f != null) + return false; + + return true; + } + + @Override + public int hashCode() { + return f != null ? f.hashCode() : 0; + } + } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/2b214cdd/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java index c28a39e..c2ebd7e 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java @@ -23,12 +23,11 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.sqoop.common.SqoopException; -//import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; -//import org.apache.sqoop.connector.idf.IntermediateDataFormat; +import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; +import org.apache.sqoop.connector.idf.IntermediateDataFormat; import org.apache.sqoop.job.JobConstants; import org.apache.sqoop.job.etl.Loader; import org.apache.sqoop.job.etl.LoaderContext; -import org.apache.sqoop.model.MJob; import org.apache.sqoop.job.io.SqoopWritable; import org.junit.Before; import org.junit.Test; @@ -39,185 +38,182 @@ import java.util.concurrent.TimeUnit; public class TestSqoopOutputFormatLoadExecutor { -// private Configuration conf; -// -// public static class ThrowingLoader extends Loader { -// -// public ThrowingLoader() { -// -// } -// -// @Override -// public void load(LoaderContext context, Object cc, Object jc) throws Exception { -// context.getDataReader().readTextRecord(); -// throw new BrokenBarrierException(); -// } -// } -// -// public static class ThrowingContinuousLoader extends Loader { -// -// public ThrowingContinuousLoader() { -// } -// -// @Override -// public void load(LoaderContext context, Object cc, Object jc) throws Exception { -// int runCount = 0; -// Object o; -// String[] arr; -// while ((o = context.getDataReader().readTextRecord()) != null) { -// arr = o.toString().split(","); -// Assert.assertEquals(100, arr.length); -// for (int i = 0; i < arr.length; i++) { -// Assert.assertEquals(i, Integer.parseInt(arr[i])); -// } -// runCount++; -// if (runCount == 5) { -// throw new ConcurrentModificationException(); -// } -// } -// } -// } -// -// public static class GoodLoader extends Loader { -// -// public GoodLoader() { -// -// } -// -// @Override -// public void load(LoaderContext context, Object cc, Object jc) throws Exception { -// String[] arr = context.getDataReader().readTextRecord().toString().split(","); -// Assert.assertEquals(100, arr.length); -// for (int i = 0; i < arr.length; i++) { -// Assert.assertEquals(i, Integer.parseInt(arr[i])); -// } -// } -// } -// -// public static class GoodContinuousLoader extends Loader { -// -// public GoodContinuousLoader() { -// -// } -// -// @Override -// public void load(LoaderContext context, Object cc, Object jc) throws Exception { -// int runCount = 0; -// Object o; -// String[] arr; -// while ((o = context.getDataReader().readTextRecord()) != null) { -// arr = o.toString().split(","); -// Assert.assertEquals(100, arr.length); -// for (int i = 0; i < arr.length; i++) { -// Assert.assertEquals(i, Integer.parseInt(arr[i])); -// } -// runCount++; -// } -// Assert.assertEquals(10, runCount); -// } -// } -// -// -// @Before -// public void setUp() { -// conf = new Configuration(); -// conf.setIfUnset(JobConstants.INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName()); -// -// } -// -// @Test(expected = BrokenBarrierException.class) -// public void testWhenLoaderThrows() throws Throwable { -// ConfigurationUtils.setJobType(conf, MJob.Type.EXPORT); -// conf.set(JobConstants.JOB_ETL_LOADER, ThrowingLoader.class.getName()); -// SqoopOutputFormatLoadExecutor executor = new -// SqoopOutputFormatLoadExecutor(true, ThrowingLoader.class.getName()); -// RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter(); -// IntermediateDataFormat data = new CSVIntermediateDataFormat(); -// SqoopWritable writable = new SqoopWritable(); -// try { -// for (int count = 0; count < 100; count++) { -// data.setTextData(String.valueOf(count)); -// writable.setString(data.getTextData()); -// writer.write(writable, null); -// } -// } catch (SqoopException ex) { -// throw ex.getCause(); -// } -// } -// -// @Test -// public void testSuccessfulContinuousLoader() throws Throwable { -// ConfigurationUtils.setJobType(conf, MJob.Type.EXPORT); -// conf.set(JobConstants.JOB_ETL_LOADER, GoodContinuousLoader.class.getName()); -// SqoopOutputFormatLoadExecutor executor = new -// SqoopOutputFormatLoadExecutor(true, GoodContinuousLoader.class.getName()); -// RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter(); -// IntermediateDataFormat data = new CSVIntermediateDataFormat(); -// SqoopWritable writable = new SqoopWritable(); -// for (int i = 0; i < 10; i++) { -// StringBuilder builder = new StringBuilder(); -// for (int count = 0; count < 100; count++) { -// builder.append(String.valueOf(count)); -// if (count != 99) { -// builder.append(","); -// } -// } -// data.setTextData(builder.toString()); -// writable.setString(data.getTextData()); -// writer.write(writable, null); -// } -// writer.close(null); -// } -// -// @Test (expected = SqoopException.class) -// public void testSuccessfulLoader() throws Throwable { -// SqoopOutputFormatLoadExecutor executor = new -// SqoopOutputFormatLoadExecutor(true, GoodLoader.class.getName()); -// RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter(); -// IntermediateDataFormat data = new CSVIntermediateDataFormat(); -// SqoopWritable writable = new SqoopWritable(); -// StringBuilder builder = new StringBuilder(); -// for (int count = 0; count < 100; count++) { -// builder.append(String.valueOf(count)); -// if (count != 99) { -// builder.append(","); -// } -// } -// data.setTextData(builder.toString()); -// writable.setString(data.getTextData()); -// writer.write(writable, null); -// -// //Allow writer to complete. -// TimeUnit.SECONDS.sleep(5); -// writer.close(null); -// } -// -// -// @Test(expected = ConcurrentModificationException.class) -// public void testThrowingContinuousLoader() throws Throwable { -// ConfigurationUtils.setJobType(conf, MJob.Type.EXPORT); -// conf.set(JobConstants.JOB_ETL_LOADER, ThrowingContinuousLoader.class.getName()); -// SqoopOutputFormatLoadExecutor executor = new -// SqoopOutputFormatLoadExecutor(true, ThrowingContinuousLoader.class.getName()); -// RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter(); -// IntermediateDataFormat data = new CSVIntermediateDataFormat(); -// SqoopWritable writable = new SqoopWritable(); -// try { -// for (int i = 0; i < 10; i++) { -// StringBuilder builder = new StringBuilder(); -// for (int count = 0; count < 100; count++) { -// builder.append(String.valueOf(count)); -// if (count != 99) { -// builder.append(","); -// } -// } -// data.setTextData(builder.toString()); -// writable.setString(data.getTextData()); -// writer.write(writable, null); -// } -// writer.close(null); -// } catch (SqoopException ex) { -// throw ex.getCause(); -// } -// } + private Configuration conf; + + public static class ThrowingLoader extends Loader { + + public ThrowingLoader() { + + } + + @Override + public void load(LoaderContext context, Object cc, Object jc) throws Exception { + context.getDataReader().readTextRecord(); + throw new BrokenBarrierException(); + } + } + + public static class ThrowingContinuousLoader extends Loader { + + public ThrowingContinuousLoader() { + } + + @Override + public void load(LoaderContext context, Object cc, Object jc) throws Exception { + int runCount = 0; + Object o; + String[] arr; + while ((o = context.getDataReader().readTextRecord()) != null) { + arr = o.toString().split(","); + Assert.assertEquals(100, arr.length); + for (int i = 0; i < arr.length; i++) { + Assert.assertEquals(i, Integer.parseInt(arr[i])); + } + runCount++; + if (runCount == 5) { + throw new ConcurrentModificationException(); + } + } + } + } + + public static class GoodLoader extends Loader { + + public GoodLoader() { + + } + + @Override + public void load(LoaderContext context, Object cc, Object jc) throws Exception { + String[] arr = context.getDataReader().readTextRecord().toString().split(","); + Assert.assertEquals(100, arr.length); + for (int i = 0; i < arr.length; i++) { + Assert.assertEquals(i, Integer.parseInt(arr[i])); + } + } + } + + public static class GoodContinuousLoader extends Loader { + + public GoodContinuousLoader() { + + } + + @Override + public void load(LoaderContext context, Object cc, Object jc) throws Exception { + int runCount = 0; + Object o; + String[] arr; + while ((o = context.getDataReader().readTextRecord()) != null) { + arr = o.toString().split(","); + Assert.assertEquals(100, arr.length); + for (int i = 0; i < arr.length; i++) { + Assert.assertEquals(i, Integer.parseInt(arr[i])); + } + runCount++; + } + Assert.assertEquals(10, runCount); + } + } + + + @Before + public void setUp() { + conf = new Configuration(); + conf.setIfUnset(JobConstants.INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName()); + + } + + @Test(expected = BrokenBarrierException.class) + public void testWhenLoaderThrows() throws Throwable { + conf.set(JobConstants.JOB_ETL_LOADER, ThrowingLoader.class.getName()); + SqoopOutputFormatLoadExecutor executor = new + SqoopOutputFormatLoadExecutor(true, ThrowingLoader.class.getName()); + RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter(); + IntermediateDataFormat data = new CSVIntermediateDataFormat(); + SqoopWritable writable = new SqoopWritable(); + try { + for (int count = 0; count < 100; count++) { + data.setTextData(String.valueOf(count)); + writable.setString(data.getTextData()); + writer.write(writable, null); + } + } catch (SqoopException ex) { + throw ex.getCause(); + } + } + + @Test + public void testSuccessfulContinuousLoader() throws Throwable { + conf.set(JobConstants.JOB_ETL_LOADER, GoodContinuousLoader.class.getName()); + SqoopOutputFormatLoadExecutor executor = new + SqoopOutputFormatLoadExecutor(true, GoodContinuousLoader.class.getName()); + RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter(); + IntermediateDataFormat data = new CSVIntermediateDataFormat(); + SqoopWritable writable = new SqoopWritable(); + for (int i = 0; i < 10; i++) { + StringBuilder builder = new StringBuilder(); + for (int count = 0; count < 100; count++) { + builder.append(String.valueOf(count)); + if (count != 99) { + builder.append(","); + } + } + data.setTextData(builder.toString()); + writable.setString(data.getTextData()); + writer.write(writable, null); + } + writer.close(null); + } + + @Test (expected = SqoopException.class) + public void testSuccessfulLoader() throws Throwable { + SqoopOutputFormatLoadExecutor executor = new + SqoopOutputFormatLoadExecutor(true, GoodLoader.class.getName()); + RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter(); + IntermediateDataFormat data = new CSVIntermediateDataFormat(); + SqoopWritable writable = new SqoopWritable(); + StringBuilder builder = new StringBuilder(); + for (int count = 0; count < 100; count++) { + builder.append(String.valueOf(count)); + if (count != 99) { + builder.append(","); + } + } + data.setTextData(builder.toString()); + writable.setString(data.getTextData()); + writer.write(writable, null); + + //Allow writer to complete. + TimeUnit.SECONDS.sleep(5); + writer.close(null); + } + + + @Test(expected = ConcurrentModificationException.class) + public void testThrowingContinuousLoader() throws Throwable { + conf.set(JobConstants.JOB_ETL_LOADER, ThrowingContinuousLoader.class.getName()); + SqoopOutputFormatLoadExecutor executor = new + SqoopOutputFormatLoadExecutor(true, ThrowingContinuousLoader.class.getName()); + RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter(); + IntermediateDataFormat data = new CSVIntermediateDataFormat(); + SqoopWritable writable = new SqoopWritable(); + try { + for (int i = 0; i < 10; i++) { + StringBuilder builder = new StringBuilder(); + for (int count = 0; count < 100; count++) { + builder.append(String.valueOf(count)); + if (count != 99) { + builder.append(","); + } + } + data.setTextData(builder.toString()); + writable.setString(data.getTextData()); + writer.write(writable, null); + } + writer.close(null); + } catch (SqoopException ex) { + throw ex.getCause(); + } + } }
