Repository: sqoop Updated Branches: refs/heads/sqoop2 68577fbf7 -> cb8214806
http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestMRConfigurationUtils.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestMRConfigurationUtils.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestMRConfigurationUtils.java new file mode 100644 index 0000000..972b555 --- /dev/null +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestMRConfigurationUtils.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.job.mr; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.Job; +import org.apache.sqoop.common.Direction; +import org.apache.sqoop.model.ConfigurationClass; +import org.apache.sqoop.model.Config; +import org.apache.sqoop.model.ConfigClass; +import org.apache.sqoop.model.Input; +import org.apache.sqoop.schema.Schema; +import org.apache.sqoop.schema.type.Text; +import org.junit.Before; +import org.junit.Test; + +/** + * Current tests are using mockito to propagate credentials from hadoop Job object + * to hadoop JobConf object. This implementation was chosen because it's not clear + * how MapReduce is converting one object to another. + */ +public class TestMRConfigurationUtils { + + Job job; + JobConf jobConfSpy; + + @Before + public void setUp() throws Exception { + setUpHadoopJob(); + setUpHadoopJobConf(); + } + + public void setUpHadoopJob() throws Exception { + job = new Job(); + } + + public void setUpHadoopJobConf() throws Exception { + jobConfSpy = spy(new JobConf(job.getConfiguration())); + when(jobConfSpy.getCredentials()).thenReturn(job.getCredentials()); + } + + @Test + public void testLinkConfiguration() throws Exception { + MRConfigurationUtils.setConnectorLinkConfig(Direction.FROM, job, getConfig()); + setUpHadoopJobConf(); + assertEquals(getConfig(), MRConfigurationUtils.getConnectorConnectionConfig(Direction.FROM, jobConfSpy)); + + MRConfigurationUtils.setConnectorLinkConfig(Direction.TO, job, getConfig()); + setUpHadoopJobConf(); + assertEquals(getConfig(), MRConfigurationUtils.getConnectorConnectionConfig(Direction.TO, jobConfSpy)); + } + + @Test + public void testJobConfiguration() throws Exception { + MRConfigurationUtils.setConnectorJobConfig(Direction.FROM, job, getConfig()); + setUpHadoopJobConf(); + assertEquals(getConfig(), MRConfigurationUtils.getConnectorJobConfig(Direction.FROM, jobConfSpy)); + + MRConfigurationUtils.setConnectorJobConfig(Direction.TO, job, getConfig()); + setUpHadoopJobConf(); + assertEquals(getConfig(), MRConfigurationUtils.getConnectorJobConfig(Direction.TO, jobConfSpy)); + } + + @Test + public void testDriverConfiguration() throws Exception { + MRConfigurationUtils.setDriverConfig(job, getConfig()); + setUpHadoopJobConf(); + assertEquals(getConfig(), MRConfigurationUtils.getDriverConfig(jobConfSpy)); + } + + @Test + public void testConnectorSchema() throws Exception { + MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, getSchema("a")); + assertEquals(getSchema("a"), MRConfigurationUtils.getConnectorSchema(Direction.FROM, jobConfSpy)); + + MRConfigurationUtils.setConnectorSchema(Direction.TO, job, getSchema("b")); + assertEquals(getSchema("b"), MRConfigurationUtils.getConnectorSchema(Direction.TO, jobConfSpy)); + } + + @Test + public void testConnectorSchemaNull() throws Exception { + MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, null); + assertNull(MRConfigurationUtils.getConnectorSchema(Direction.FROM, jobConfSpy)); + + MRConfigurationUtils.setConnectorSchema(Direction.TO, job, null); + assertNull(MRConfigurationUtils.getConnectorSchema(Direction.FROM, jobConfSpy)); + } + + private Schema getSchema(String name) { + return new Schema(name).addColumn(new Text("c1")); + } + + private TestConfiguration getConfig() { + TestConfiguration c = new TestConfiguration(); + c.c.A = "This is secret text!"; + return c; + } + + @ConfigClass + public static class C { + + @Input String A; + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof C)) return false; + + C c = (C) o; + + if (A != null ? !A.equals(c.A) : c.A != null) return false; + + return true; + } + + @Override + public int hashCode() { + return A != null ? A.hashCode() : 0; + } + } + + @ConfigurationClass + public static class TestConfiguration { + @Config C c; + + public TestConfiguration() { + c = new C(); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof TestConfiguration)) return false; + + TestConfiguration config = (TestConfiguration) o; + + if (c != null ? !c.equals(config.c) : config.c != null) + return false; + + return true; + } + + @Override + public int hashCode() { + return c != null ? c.hashCode() : 0; + } + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java index 1f411d2..5bd11f0 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java @@ -24,7 +24,7 @@ import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; import org.apache.sqoop.connector.idf.IntermediateDataFormat; -import org.apache.sqoop.job.JobConstants; +import org.apache.sqoop.job.MRJobConstants; import org.apache.sqoop.job.etl.Loader; import org.apache.sqoop.job.etl.LoaderContext; import org.apache.sqoop.job.io.SqoopWritable; @@ -120,13 +120,13 @@ public class TestSqoopOutputFormatLoadExecutor { @Before public void setUp() { conf = new Configuration(); - conf.setIfUnset(JobConstants.INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName()); + conf.setIfUnset(MRJobConstants.INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName()); } @Test(expected = BrokenBarrierException.class) public void testWhenLoaderThrows() throws Throwable { - conf.set(JobConstants.JOB_ETL_LOADER, ThrowingLoader.class.getName()); + conf.set(MRJobConstants.JOB_ETL_LOADER, ThrowingLoader.class.getName()); SqoopOutputFormatLoadExecutor executor = new SqoopOutputFormatLoadExecutor(true, ThrowingLoader.class.getName()); RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter(); @@ -145,7 +145,7 @@ public class TestSqoopOutputFormatLoadExecutor { @Test public void testSuccessfulContinuousLoader() throws Throwable { - conf.set(JobConstants.JOB_ETL_LOADER, GoodContinuousLoader.class.getName()); + conf.set(MRJobConstants.JOB_ETL_LOADER, GoodContinuousLoader.class.getName()); SqoopOutputFormatLoadExecutor executor = new SqoopOutputFormatLoadExecutor(true, GoodContinuousLoader.class.getName()); RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter(); @@ -192,7 +192,7 @@ public class TestSqoopOutputFormatLoadExecutor { @Test(expected = ConcurrentModificationException.class) public void testThrowingContinuousLoader() throws Throwable { - conf.set(JobConstants.JOB_ETL_LOADER, ThrowingContinuousLoader.class.getName()); + conf.set(MRJobConstants.JOB_ETL_LOADER, ThrowingContinuousLoader.class.getName()); SqoopOutputFormatLoadExecutor executor = new SqoopOutputFormatLoadExecutor(true, ThrowingContinuousLoader.class.getName()); RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter(); http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java ---------------------------------------------------------------------- diff --git a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java index 0c492ef..646e8cb 100644 --- a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java +++ b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java @@ -39,8 +39,8 @@ import org.apache.sqoop.driver.SubmissionEngine; import org.apache.sqoop.execution.mapreduce.MRJobRequest; import org.apache.sqoop.execution.mapreduce.MapreduceExecutionEngine; import org.apache.sqoop.driver.JobRequest; -import org.apache.sqoop.job.JobConstants; -import org.apache.sqoop.job.mr.ConfigurationUtils; +import org.apache.sqoop.job.MRJobConstants; +import org.apache.sqoop.job.mr.MRConfigurationUtils; import org.apache.sqoop.submission.SubmissionStatus; import org.apache.sqoop.submission.counter.Counter; import org.apache.sqoop.submission.counter.CounterGroup; @@ -172,7 +172,7 @@ public class MapreduceSubmissionEngine extends SubmissionEngine { continue; } configuration.set( - JobConstants.PREFIX_CONNECTOR_FROM_CONTEXT + entry.getKey(), + MRJobConstants.PREFIX_CONNECTOR_FROM_CONTEXT + entry.getKey(), entry.getValue()); } @@ -182,7 +182,7 @@ public class MapreduceSubmissionEngine extends SubmissionEngine { continue; } configuration.set( - JobConstants.PREFIX_CONNECTOR_TO_CONTEXT + entry.getKey(), + MRJobConstants.PREFIX_CONNECTOR_TO_CONTEXT + entry.getKey(), entry.getValue()); } @@ -202,17 +202,17 @@ public class MapreduceSubmissionEngine extends SubmissionEngine { Job job = new Job(configuration); // link configs - ConfigurationUtils.setConnectorLinkConfig(Direction.FROM, job, request.getConnectorLinkConfig(Direction.FROM)); - ConfigurationUtils.setConnectorLinkConfig(Direction.TO, job, request.getConnectorLinkConfig(Direction.TO)); + MRConfigurationUtils.setConnectorLinkConfig(Direction.FROM, job, request.getConnectorLinkConfig(Direction.FROM)); + MRConfigurationUtils.setConnectorLinkConfig(Direction.TO, job, request.getConnectorLinkConfig(Direction.TO)); // from and to configs - ConfigurationUtils.setConnectorJobConfig(Direction.FROM, job, request.getJobConfig(Direction.FROM)); - ConfigurationUtils.setConnectorJobConfig(Direction.TO, job, request.getJobConfig(Direction.TO)); + MRConfigurationUtils.setConnectorJobConfig(Direction.FROM, job, request.getJobConfig(Direction.FROM)); + MRConfigurationUtils.setConnectorJobConfig(Direction.TO, job, request.getJobConfig(Direction.TO)); - ConfigurationUtils.setDriverConfig(job, request.getDriverConfig()); + MRConfigurationUtils.setDriverConfig(job, request.getDriverConfig()); // @TODO(Abe): Persist TO schema. - ConfigurationUtils.setConnectorSchema(Direction.FROM, job, request.getSummary().getFromSchema()); - ConfigurationUtils.setConnectorSchema(Direction.TO, job, request.getSummary().getToSchema()); + MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, request.getSummary().getFromSchema()); + MRConfigurationUtils.setConnectorSchema(Direction.TO, job, request.getSummary().getToSchema()); if(request.getJobName() != null) { job.setJobName("Sqoop: " + request.getJobName());
