http://git-wip-us.apache.org/repos/asf/sqoop/blob/2255e721/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java new file mode 100644 index 0000000..d236148 --- /dev/null +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.job.mr; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.sqoop.job.io.Data; + +/** + * A reducer to perform reduce function. + */ +public class SqoopReducer + extends Reducer<Data, NullWritable, Data, NullWritable> { + + public static final Log LOG = + LogFactory.getLog(SqoopReducer.class.getName()); + +}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/2255e721/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopSplit.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopSplit.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopSplit.java new file mode 100644 index 0000000..7dc9541 --- /dev/null +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopSplit.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.job.mr; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.core.CoreError; +import org.apache.sqoop.job.etl.Partition; +import org.apache.sqoop.utils.ClassUtils; + +/** + * An input split to be read. + */ +public class SqoopSplit extends InputSplit implements Writable { + + private Partition partition; + + public void setPartition(Partition partition) { + this.partition = partition; + } + + public Partition getPartition() { + return partition; + } + + @Override + public long getLength() throws IOException { + return 0; + } + + @Override + public String[] getLocations() throws IOException { + return new String[] {}; + } + + @Override + public void readFields(DataInput in) throws IOException { + // read Partition class name + String className = in.readUTF(); + // instantiate Partition object + Class<?> clz = ClassUtils.loadClass(className); + if (clz == null) { + throw new SqoopException(CoreError.CORE_0009, className); + } + try { + partition = (Partition) clz.newInstance(); + } catch (Exception e) { + throw new SqoopException(CoreError.CORE_0010, className, e); + } + // read Partition object content + partition.readFields(in); + } + + @Override + public void write(DataOutput out) throws IOException { + // write Partition class name + out.writeUTF(partition.getClass().getName()); + // write Partition object content + partition.write(out); + } + +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/2255e721/execution/mapreduce/src/test/java/org/apache/sqoop/job/FileUtils.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/FileUtils.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/FileUtils.java new file mode 100644 index 0000000..e685883 --- /dev/null +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/FileUtils.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.job; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +public class FileUtils { + + public static boolean exists(String file) throws IOException { + Path path = new Path(file); + FileSystem fs = path.getFileSystem(new Configuration()); + return fs.exists(path); + } + + public static void delete(String file) throws IOException { + Path path = new Path(file); + FileSystem fs = path.getFileSystem(new Configuration()); + if (fs.exists(path)) { + fs.delete(path, true); + } + } + + public static void mkdirs(String directory) throws IOException { + Path path = new Path(directory); + FileSystem fs = path.getFileSystem(new Configuration()); + if (!fs.exists(path)) { + fs.mkdirs(path); + } + } + + public static InputStream open(String fileName) + throws IOException, ClassNotFoundException { + Path filepath = new Path(fileName); + FileSystem fs = filepath.getFileSystem(new Configuration()); + return fs.open(filepath); + } + + public static OutputStream create(String fileName) throws IOException { + Path filepath = new Path(fileName); + FileSystem fs = filepath.getFileSystem(new Configuration()); + return fs.create(filepath, false); + } + + private FileUtils() { + // Disable explicit object creation + } + +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/2255e721/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java new file mode 100644 index 0000000..e6ead3f --- /dev/null +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.job; + +import java.io.IOException; + +import junit.framework.Assert; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.sqoop.job.io.Data; +import org.apache.sqoop.job.mr.SqoopFileOutputFormat; +import org.apache.sqoop.job.mr.SqoopInputFormat; +import org.apache.sqoop.job.mr.SqoopMapper; +import org.apache.sqoop.job.mr.SqoopNullOutputFormat; +import org.apache.sqoop.job.mr.SqoopSplit; + +public class JobUtils { + + public static void runJob(Configuration conf) + throws IOException, InterruptedException, ClassNotFoundException { + runJob(conf, SqoopInputFormat.class, SqoopMapper.class, + (conf.get(FileOutputFormat.OUTDIR) != null) ? + SqoopFileOutputFormat.class : SqoopNullOutputFormat.class); + } + + public static void runJob(Configuration conf, + Class<? extends InputFormat<SqoopSplit, NullWritable>> input, + Class<? extends Mapper<SqoopSplit, NullWritable, Data, NullWritable>> mapper, + Class<? extends OutputFormat<Data, NullWritable>> output) + throws IOException, InterruptedException, ClassNotFoundException { + Job job = Job.getInstance(conf); + job.setInputFormatClass(input); + job.setMapperClass(mapper); + job.setMapOutputKeyClass(Data.class); + job.setMapOutputValueClass(NullWritable.class); + job.setOutputFormatClass(output); + job.setOutputKeyClass(Data.class); + job.setOutputValueClass(NullWritable.class); + + boolean success = job.waitForCompletion(true); + Assert.assertEquals("Job failed!", true, success); + } + + private JobUtils() { + // Disable explicit object creation + } + +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/2255e721/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java new file mode 100644 index 0000000..c74faa2 --- /dev/null +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java @@ -0,0 +1,232 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.job; + +import java.io.BufferedReader; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.LinkedList; +import java.util.List; + +import junit.framework.TestCase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.sqoop.job.etl.Extractor; +import org.apache.sqoop.job.etl.HdfsSequenceImportLoader; +import org.apache.sqoop.job.etl.HdfsTextImportLoader; +import org.apache.sqoop.job.etl.Partition; +import org.apache.sqoop.job.etl.Partitioner; +import org.apache.sqoop.job.io.Data; +import org.apache.sqoop.job.io.DataWriter; +import org.apache.sqoop.job.mr.SqoopFileOutputFormat; +import org.junit.Test; + +public class TestHdfsLoad extends TestCase { + + private static final String OUTPUT_ROOT = "/tmp/sqoop/warehouse/"; + private static final String OUTPUT_FILE = "part-r-00000"; + private static final int START_ID = 1; + private static final int NUMBER_OF_IDS = 9; + private static final int NUMBER_OF_ROWS_PER_ID = 10; + + private String outdir; + + public TestHdfsLoad() { + outdir = OUTPUT_ROOT + "/" + getClass().getSimpleName(); + } + + public void testVoid() {} + /* + @Test + public void testUncompressedText() throws Exception { + FileUtils.delete(outdir); + + Configuration conf = new Configuration(); + conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); + conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); + conf.set(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName()); + conf.set(FileOutputFormat.OUTDIR, outdir); + JobUtils.runJob(conf); + + String fileName = outdir + "/" + OUTPUT_FILE; + InputStream filestream = FileUtils.open(fileName); + BufferedReader filereader = new BufferedReader(new InputStreamReader( + filestream, Data.CHARSET_NAME)); + verifyOutputText(filereader); + } + + @Test + public void testCompressedText() throws Exception { + FileUtils.delete(outdir); + + Configuration conf = new Configuration(); + conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); + conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); + conf.set(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName()); + conf.set(FileOutputFormat.OUTDIR, outdir); + conf.setBoolean(FileOutputFormat.COMPRESS, true); + JobUtils.runJob(conf); + + Class<? extends CompressionCodec> codecClass = conf.getClass( + FileOutputFormat.COMPRESS_CODEC, SqoopFileOutputFormat.DEFAULT_CODEC) + .asSubclass(CompressionCodec.class); + CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf); + String fileName = outdir + "/" + OUTPUT_FILE + codec.getDefaultExtension(); + InputStream filestream = codec.createInputStream(FileUtils.open(fileName)); + BufferedReader filereader = new BufferedReader(new InputStreamReader( + filestream, Data.CHARSET_NAME)); + verifyOutputText(filereader); + } + + private void verifyOutputText(BufferedReader reader) throws IOException { + String actual = null; + String expected; + Data data = new Data(); + int index = START_ID*NUMBER_OF_ROWS_PER_ID; + while ((actual = reader.readLine()) != null){ + data.setContent(new Object[] { + new Integer(index), new Double(index), String.valueOf(index) }, + Data.ARRAY_RECORD); + expected = data.toString(); + index++; + + assertEquals(expected, actual); + } + reader.close(); + + assertEquals(NUMBER_OF_IDS*NUMBER_OF_ROWS_PER_ID, + index-START_ID*NUMBER_OF_ROWS_PER_ID); + } + + @Test + public void testUncompressedSequence() throws Exception { + FileUtils.delete(outdir); + + Configuration conf = new Configuration(); + conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); + conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); + conf.set(JobConstants.JOB_ETL_LOADER, HdfsSequenceImportLoader.class.getName()); + conf.set(FileOutputFormat.OUTDIR, outdir); + JobUtils.runJob(conf); + + Path filepath = new Path(outdir, + OUTPUT_FILE + HdfsSequenceImportLoader.EXTENSION); + SequenceFile.Reader filereader = new SequenceFile.Reader(conf, + SequenceFile.Reader.file(filepath)); + verifyOutputSequence(filereader); + } + + @Test + public void testCompressedSequence() throws Exception { + FileUtils.delete(outdir); + + Configuration conf = new Configuration(); + conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); + conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); + conf.set(JobConstants.JOB_ETL_LOADER, HdfsSequenceImportLoader.class.getName()); + conf.set(FileOutputFormat.OUTDIR, outdir); + conf.setBoolean(FileOutputFormat.COMPRESS, true); + JobUtils.runJob(conf); + + Path filepath = new Path(outdir, + OUTPUT_FILE + HdfsSequenceImportLoader.EXTENSION); + SequenceFile.Reader filereader = new SequenceFile.Reader(conf, + SequenceFile.Reader.file(filepath)); + verifyOutputSequence(filereader); + } + + private void verifyOutputSequence(SequenceFile.Reader reader) throws IOException { + int index = START_ID*NUMBER_OF_ROWS_PER_ID; + Text actual = new Text(); + Text expected = new Text(); + Data data = new Data(); + while (reader.next(actual)){ + data.setContent(new Object[] { + new Integer(index), new Double(index), String.valueOf(index) }, + Data.ARRAY_RECORD); + expected.set(data.toString()); + index++; + + assertEquals(expected.toString(), actual.toString()); + } + reader.close(); + + assertEquals(NUMBER_OF_IDS*NUMBER_OF_ROWS_PER_ID, + index-START_ID*NUMBER_OF_ROWS_PER_ID); + } + + public static class DummyPartition extends Partition { + private int id; + + public void setId(int id) { + this.id = id; + } + + public int getId() { + return id; + } + + @Override + public void readFields(DataInput in) throws IOException { + id = in.readInt(); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(id); + } + } + + public static class DummyPartitioner extends Partitioner { + @Override + public List<Partition> initialize(Context context) { + List<Partition> partitions = new LinkedList<Partition>(); + for (int id = START_ID; id <= NUMBER_OF_IDS; id++) { + DummyPartition partition = new DummyPartition(); + partition.setId(id); + partitions.add(partition); + } + return partitions; + } + } + + public static class DummyExtractor extends Extractor { + @Override + public void initialize(Context context, Partition partition, DataWriter writer) { + int id = ((DummyPartition)partition).getId(); + for (int row = 0; row < NUMBER_OF_ROWS_PER_ID; row++) { + Object[] array = new Object[] { + new Integer(id*NUMBER_OF_ROWS_PER_ID+row), + new Double(id*NUMBER_OF_ROWS_PER_ID+row), + String.valueOf(id*NUMBER_OF_ROWS_PER_ID+row) + }; + writer.writeArrayRecord(array); + } + } + } + */ +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/2255e721/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestJobEngine.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestJobEngine.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestJobEngine.java new file mode 100644 index 0000000..51dddb4 --- /dev/null +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestJobEngine.java @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.job; + +import java.io.BufferedReader; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.LinkedList; +import java.util.List; +import java.util.Locale; +import java.util.ResourceBundle; + +import junit.framework.TestCase; + +import org.apache.sqoop.connector.spi.SqoopConnector; +import org.apache.sqoop.job.etl.Exporter; +import org.apache.sqoop.job.etl.Extractor; +import org.apache.sqoop.job.etl.Importer; +import org.apache.sqoop.job.etl.Initializer; +import org.apache.sqoop.job.etl.Partition; +import org.apache.sqoop.job.etl.Partitioner; +import org.apache.sqoop.job.io.Data; +import org.apache.sqoop.job.io.DataWriter; +import org.apache.sqoop.model.MConnectionForms; +import org.apache.sqoop.model.MJob.Type; +import org.apache.sqoop.model.MJobForms; +import org.apache.sqoop.validation.Validator; +import org.junit.Test; + +public class TestJobEngine extends TestCase { + + private static final String DATA_DIR = TestJobEngine.class.getSimpleName(); + private static final String WAREHOUSE_ROOT = "/tmp/sqoop/warehouse/"; + + private static final String OUTPUT_DIR = WAREHOUSE_ROOT + DATA_DIR; + private static final String OUTPUT_FILE = "part-r-00000"; + private static final int START_PARTITION = 1; + private static final int NUMBER_OF_PARTITIONS = 9; + private static final int NUMBER_OF_ROWS_PER_PARTITION = 10; + + public void testVoid() { } +/* + @Test + public void testImport() throws Exception { + FileUtils.delete(OUTPUT_DIR); + + DummyConnector connector = new DummyConnector(); + EtlOptions options = new EtlOptions(connector); + + JobEngine engine = new JobEngine(); + engine.initialize(options); + + String fileName = OUTPUT_DIR + "/" + OUTPUT_FILE; + InputStream filestream = FileUtils.open(fileName); + BufferedReader filereader = new BufferedReader(new InputStreamReader( + filestream, Data.CHARSET_NAME)); + verifyOutput(filereader); + } + + private void verifyOutput(BufferedReader reader) + throws IOException { + String line = null; + int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION; + Data expected = new Data(); + while ((line = reader.readLine()) != null){ + expected.setContent(new Object[] { + new Integer(index), + new Double(index), + String.valueOf(index) }, + Data.ARRAY_RECORD); + index++; + + assertEquals(expected.toString(), line); + } + reader.close(); + + assertEquals(NUMBER_OF_PARTITIONS*NUMBER_OF_ROWS_PER_PARTITION, + index-START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION); + } + + public class DummyConnector implements SqoopConnector { + + @Override + public Importer getImporter() { + return new Importer( + DummyImportInitializer.class, + DummyImportPartitioner.class, + DummyImportExtractor.class, + null); + } + + @Override + public Exporter getExporter() { + fail("This method should not be invoked."); + return null; + } + + @Override + public ResourceBundle getBundle(Locale locale) { + fail("This method should not be invoked."); + return null; + } + + @Override + public Validator getValidator() { + fail("This method should not be invoked."); + return null; + } + + @Override + public Class getConnectionConfigurationClass() { + fail("This method should not be invoked."); + return null; + } + + @Override + public Class getJobConfigurationClass(Type jobType) { + fail("This method should not be invoked."); + return null; + } + } + + public static class DummyImportInitializer extends Initializer { + @Override + public void initialize(MutableContext context, Options options) { + context.setString(Constants.JOB_ETL_OUTPUT_DIRECTORY, OUTPUT_DIR); + } + } + + public static class DummyImportPartition extends Partition { + private int id; + + public void setId(int id) { + this.id = id; + } + + public int getId() { + return id; + } + + @Override + public void readFields(DataInput in) throws IOException { + id = in.readInt(); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(id); + } + } + + public static class DummyImportPartitioner extends Partitioner { + @Override + public List<Partition> initialize(Context context) { + List<Partition> partitions = new LinkedList<Partition>(); + for (int id = START_PARTITION; id <= NUMBER_OF_PARTITIONS; id++) { + DummyImportPartition partition = new DummyImportPartition(); + partition.setId(id); + partitions.add(partition); + } + return partitions; + } + } + + public static class DummyImportExtractor extends Extractor { + @Override + public void initialize(Context context, Partition partition, DataWriter writer) { + int id = ((DummyImportPartition)partition).getId(); + for (int row = 0; row < NUMBER_OF_ROWS_PER_PARTITION; row++) { + writer.writeArrayRecord(new Object[] { + new Integer(id*NUMBER_OF_ROWS_PER_PARTITION+row), + new Double(id*NUMBER_OF_ROWS_PER_PARTITION+row), + String.valueOf(id*NUMBER_OF_ROWS_PER_PARTITION+row)}); + } + } + } +*/ +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/2255e721/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java new file mode 100644 index 0000000..94ab560 --- /dev/null +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java @@ -0,0 +1,229 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.job; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; + +import junit.framework.TestCase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.sqoop.job.etl.Extractor; +import org.apache.sqoop.job.etl.Loader; +import org.apache.sqoop.job.etl.Partition; +import org.apache.sqoop.job.etl.Partitioner; +import org.apache.sqoop.job.io.Data; +import org.apache.sqoop.job.io.DataReader; +import org.apache.sqoop.job.io.DataWriter; +import org.apache.sqoop.job.mr.SqoopInputFormat; +import org.apache.sqoop.job.mr.SqoopMapper; +import org.apache.sqoop.job.mr.SqoopNullOutputFormat; +import org.apache.sqoop.job.mr.SqoopSplit; +import org.junit.Test; + +public class TestMapReduce extends TestCase { + + private static final int START_PARTITION = 1; + private static final int NUMBER_OF_PARTITIONS = 9; + private static final int NUMBER_OF_ROWS_PER_PARTITION = 10; + + public void testVoid() {} + + /* + @Test + public void testInputFormat() throws Exception { + Configuration conf = new Configuration(); + conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); + Job job = Job.getInstance(conf); + + SqoopInputFormat inputformat = new SqoopInputFormat(); + List<InputSplit> splits = inputformat.getSplits(job); + assertEquals(9, splits.size()); + + for (int id = START_PARTITION; id <= NUMBER_OF_PARTITIONS; id++) { + SqoopSplit split = (SqoopSplit)splits.get(id-1); + DummyPartition partition = (DummyPartition)split.getPartition(); + assertEquals(id, partition.getId()); + } + } + + @Test + public void testMapper() throws Exception { + Configuration conf = new Configuration(); + conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); + conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); + + JobUtils.runJob(conf, SqoopInputFormat.class, SqoopMapper.class, + DummyOutputFormat.class); + } + + @Test + public void testOutputFormat() throws Exception { + Configuration conf = new Configuration(); + conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); + conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); + conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName()); + + JobUtils.runJob(conf, SqoopInputFormat.class, SqoopMapper.class, + SqoopNullOutputFormat.class); + } + + public static class DummyPartition extends Partition { + private int id; + + public void setId(int id) { + this.id = id; + } + + public int getId() { + return id; + } + + @Override + public void readFields(DataInput in) throws IOException { + id = in.readInt(); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(id); + } + } + + public static class DummyPartitioner extends Partitioner { + @Override + public List<Partition> initialize(Context context) { + List<Partition> partitions = new LinkedList<Partition>(); + for (int id = START_PARTITION; id <= NUMBER_OF_PARTITIONS; id++) { + DummyPartition partition = new DummyPartition(); + partition.setId(id); + partitions.add(partition); + } + return partitions; + } + } + + public static class DummyExtractor extends Extractor { + @Override + public void initialize(Context context, Partition partition, DataWriter writer) { + int id = ((DummyPartition)partition).getId(); + for (int row = 0; row < NUMBER_OF_ROWS_PER_PARTITION; row++) { + writer.writeArrayRecord(new Object[] { + new Integer(id*NUMBER_OF_ROWS_PER_PARTITION+row), + new Double(id*NUMBER_OF_ROWS_PER_PARTITION+row), + String.valueOf(id*NUMBER_OF_ROWS_PER_PARTITION+row)}); + } + } + } + + public static class DummyOutputFormat + extends OutputFormat<Data, NullWritable> { + @Override + public void checkOutputSpecs(JobContext context) { + // do nothing + } + + @Override + public RecordWriter<Data, NullWritable> getRecordWriter( + TaskAttemptContext context) { + return new DummyRecordWriter(); + } + + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext context) { + return new DummyOutputCommitter(); + } + + public static class DummyRecordWriter + extends RecordWriter<Data, NullWritable> { + private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION; + private Data data = new Data(); + + @Override + public void write(Data key, NullWritable value) { + data.setContent(new Object[] { + new Integer(index), + new Double(index), + String.valueOf(index)}, + Data.ARRAY_RECORD); + index++; + + assertEquals(data.toString(), key.toString()); + } + + @Override + public void close(TaskAttemptContext context) { + // do nothing + } + } + + public static class DummyOutputCommitter extends OutputCommitter { + @Override + public void setupJob(JobContext jobContext) { } + + @Override + public void setupTask(TaskAttemptContext taskContext) { } + + @Override + public void commitTask(TaskAttemptContext taskContext) { } + + @Override + public void abortTask(TaskAttemptContext taskContext) { } + + @Override + public boolean needsTaskCommit(TaskAttemptContext taskContext) { + return false; + } + } + } + + public static class DummyLoader extends Loader { + private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION; + private Data expected = new Data(); + private Data actual = new Data(); + + @Override + public void initialize(Context context, DataReader reader) { + Object[] array; + while ((array = reader.readArrayRecord()) != null) { + actual.setContent(array, Data.ARRAY_RECORD); + + expected.setContent(new Object[] { + new Integer(index), + new Double(index), + String.valueOf(index)}, + Data.ARRAY_RECORD); + index++; + + assertEquals(expected.toString(), actual.toString()); + }; + } + } + */ +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/2255e721/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestData.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestData.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestData.java new file mode 100644 index 0000000..d4a7d4d --- /dev/null +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestData.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.job.io; + +import java.util.Arrays; + +import junit.framework.TestCase; + +import org.junit.Test; + +public class TestData extends TestCase { + + private static final double TEST_NUMBER = Math.PI + 100; + @Test + public void testArrayToCsv() throws Exception { + Data data = new Data(); + String expected; + String actual; + + // with special characters: + expected = + Long.valueOf((long)TEST_NUMBER) + "," + + Double.valueOf(TEST_NUMBER) + "," + + "'" + String.valueOf(TEST_NUMBER) + "\\',s'" + "," + + Arrays.toString(new byte[] {1, 2, 3, 4, 5}); + data.setContent(new Object[] { + Long.valueOf((long)TEST_NUMBER), + Double.valueOf(TEST_NUMBER), + String.valueOf(TEST_NUMBER) + "',s", + new byte[] {1, 2, 3, 4, 5} }, + Data.ARRAY_RECORD); + actual = (String)data.getContent(Data.CSV_RECORD); + assertEquals(expected, actual); + + // with null characters: + expected = + Long.valueOf((long)TEST_NUMBER) + "," + + Double.valueOf(TEST_NUMBER) + "," + + "null" + "," + + Arrays.toString(new byte[] {1, 2, 3, 4, 5}); + data.setContent(new Object[] { + Long.valueOf((long)TEST_NUMBER), + Double.valueOf(TEST_NUMBER), + null, + new byte[] {1, 2, 3, 4, 5} }, + Data.ARRAY_RECORD); + actual = (String)data.getContent(Data.CSV_RECORD); + assertEquals(expected, actual); + } + + public static void assertEquals(Object expected, Object actual) { + if (expected instanceof byte[]) { + assertEquals(Arrays.toString((byte[])expected), + Arrays.toString((byte[])actual)); + } else { + TestCase.assertEquals(expected, actual); + } + } + +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/2255e721/execution/pom.xml ---------------------------------------------------------------------- diff --git a/execution/pom.xml b/execution/pom.xml new file mode 100644 index 0000000..fb9f801 --- /dev/null +++ b/execution/pom.xml @@ -0,0 +1,36 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache</groupId> + <artifactId>sqoop</artifactId> + <version>2.0.0-SNAPSHOT</version> + </parent> + + <groupId>org.apache.sqoop</groupId> + <artifactId>execution</artifactId> + <name>Sqoop Execution Engines</name> + <packaging>pom</packaging> + + <modules> + <module>mapreduce</module> + </modules> + +</project> http://git-wip-us.apache.org/repos/asf/sqoop/blob/2255e721/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index a4915fd..4211333 100644 --- a/pom.xml +++ b/pom.xml @@ -220,6 +220,7 @@ limitations under the License. <module>client</module> <module>docs</module> <module>connector</module> + <module>execution</module> <module>submission</module> <module>dist</module> </modules> http://git-wip-us.apache.org/repos/asf/sqoop/blob/2255e721/submission/mapreduce/pom.xml ---------------------------------------------------------------------- diff --git a/submission/mapreduce/pom.xml b/submission/mapreduce/pom.xml index 03c06c0..f8a7d3d 100644 --- a/submission/mapreduce/pom.xml +++ b/submission/mapreduce/pom.xml @@ -37,6 +37,12 @@ limitations under the License. </dependency> <dependency> + <groupId>org.apache.sqoop.execution</groupId> + <artifactId>sqoop-execution-mapreduce</artifactId> + <version>2.0.0-SNAPSHOT</version> + </dependency> + + <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/sqoop/blob/2255e721/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java ---------------------------------------------------------------------- diff --git a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java index 94098de..b8415e3 100644 --- a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java +++ b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java @@ -28,6 +28,8 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.log4j.Logger; import org.apache.sqoop.common.MapContext; import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.execution.mapreduce.MRSubmissionRequest; +import org.apache.sqoop.execution.mapreduce.MapreduceExecutionEngine; import org.apache.sqoop.framework.SubmissionRequest; import org.apache.sqoop.framework.SubmissionEngine; import org.apache.sqoop.job.JobConstants; @@ -116,8 +118,22 @@ public class MapreduceSubmissionEngine extends SubmissionEngine { * {@inheritDoc} */ @Override - @SuppressWarnings("unchecked") - public boolean submit(SubmissionRequest request) { + public boolean isExecutionEngineSupported(Class executionEngineClass) { + if(executionEngineClass == MapreduceExecutionEngine.class) { + return true; + } + + return false; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean submit(SubmissionRequest generalRequest) { + // We're supporting only map reduce jobs + MRSubmissionRequest request = (MRSubmissionRequest) generalRequest; + // Clone global configuration Configuration configuration = new Configuration(globalConfiguration);
