Updated Branches: refs/heads/sqoop2 47cb311a5 -> 3a88f0b48
SQOOP-589: Framework-defined text/sequence loaders for HDFS. (Bilung Lee via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/3a88f0b4 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/3a88f0b4 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/3a88f0b4 Branch: refs/heads/sqoop2 Commit: 3a88f0b48c7350af5b373c45209c6452b07d7a98 Parents: 47cb311 Author: Jarek Jarcec Cecho <[email protected]> Authored: Tue Sep 18 09:21:08 2012 +0200 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Tue Sep 18 09:21:08 2012 +0200 ---------------------------------------------------------------------- .../main/java/org/apache/sqoop/core/CoreError.java | 6 +- .../java/org/apache/sqoop/job/JobConstants.java | 11 +- .../java/org/apache/sqoop/job/etl/EtlContext.java | 11 +- .../apache/sqoop/job/etl/EtlMutableContext.java | 7 - .../sqoop/job/etl/HdfsSequenceImportLoader.java | 107 +++++++ .../apache/sqoop/job/etl/HdfsTextImportLoader.java | 100 ++++++ .../main/java/org/apache/sqoop/job/io/Data.java | 79 +++-- .../apache/sqoop/job/mr/SqoopFileOutputFormat.java | 25 ++- .../java/org/apache/sqoop/job/mr/SqoopMapper.java | 9 +- .../job/mr/SqoopOutputFormatLoadExecutor.java | 7 +- .../test/java/org/apache/sqoop/job/FileUtils.java | 40 +++ .../test/java/org/apache/sqoop/job/JobUtils.java | 69 +++++ .../java/org/apache/sqoop/job/TestHdfsLoad.java | 233 +++++++++++++++ .../java/org/apache/sqoop/job/TestMapReduce.java | 55 +--- .../java/org/apache/sqoop/job/etl/Context.java | 2 - .../org/apache/sqoop/job/etl/MutableContext.java | 2 - .../java/org/apache/sqoop/job/io/DataReader.java | 2 + .../java/org/apache/sqoop/job/io/DataWriter.java | 2 + 18 files changed, 662 insertions(+), 105 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/3a88f0b4/core/src/main/java/org/apache/sqoop/core/CoreError.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/core/CoreError.java b/core/src/main/java/org/apache/sqoop/core/CoreError.java index 303cd80..2697eef 100644 --- a/core/src/main/java/org/apache/sqoop/core/CoreError.java +++ b/core/src/main/java/org/apache/sqoop/core/CoreError.java @@ -54,7 +54,7 @@ public enum CoreError implements ErrorCode { /** Error occurs during job execution. */ CORE_0008("Error occurs during job execution"), - /** The system was load to instantiate the specified class. */ + /** The system was unable to load the specified class. */ CORE_0009("Unable to load the specified class"), /** The system was unable to instantiate the specified class. */ @@ -63,8 +63,8 @@ public enum CoreError implements ErrorCode { /** The parameter already exists in the context */ CORE_0011("The parameter already exists in the context"), - /** The data type is not supported */ - CORE_0012("The data type is not supported"), + /** The type is not supported */ + CORE_0012("The type is not supported"), /** Cannot write to the data writer */ CORE_0013("Cannot write to the data writer"), http://git-wip-us.apache.org/repos/asf/sqoop/blob/3a88f0b4/core/src/main/java/org/apache/sqoop/job/JobConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/job/JobConstants.java b/core/src/main/java/org/apache/sqoop/job/JobConstants.java index 0c1e0d0..54fc543 100644 --- a/core/src/main/java/org/apache/sqoop/job/JobConstants.java +++ b/core/src/main/java/org/apache/sqoop/job/JobConstants.java @@ -28,6 +28,7 @@ public final class JobConstants { public static final String PREFIX_JOB_CONFIG = ConfigurationConstants.PREFIX_GLOBAL_CONFIG + "job."; + public static final String JOB_ETL_PARTITIONER = PREFIX_JOB_CONFIG + "etl.partitioner"; @@ -37,11 +38,13 @@ public final class JobConstants { public static final String JOB_ETL_LOADER = PREFIX_JOB_CONFIG + "etl.loader"; - public static final String JOB_ETL_FIELD_NAMES = PREFIX_JOB_CONFIG - + "etl.field.names"; - public static final String JOB_ETL_OUTPUT_DIRECTORY = PREFIX_JOB_CONFIG - + "etl.output.directory"; + public static final String JOB_MR_OUTPUT_FILE = PREFIX_JOB_CONFIG + + "mr.output.file"; + + public static final String JOB_MR_OUTPUT_CODEC = PREFIX_JOB_CONFIG + + "mr.output.codec"; + private JobConstants() { // Disable explicit object creation http://git-wip-us.apache.org/repos/asf/sqoop/blob/3a88f0b4/core/src/main/java/org/apache/sqoop/job/etl/EtlContext.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/job/etl/EtlContext.java b/core/src/main/java/org/apache/sqoop/job/etl/EtlContext.java index da38d59..09eca58 100644 --- a/core/src/main/java/org/apache/sqoop/job/etl/EtlContext.java +++ b/core/src/main/java/org/apache/sqoop/job/etl/EtlContext.java @@ -18,8 +18,6 @@ package org.apache.sqoop.job.etl; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.util.StringUtils; -import org.apache.sqoop.job.JobConstants; /** * An immutable context used in the ETL framework @@ -33,14 +31,13 @@ public class EtlContext implements Context { this.conf = conf; } - @Override - public String getString(String key) { - return conf.get(key); + protected Configuration getConfiguration() { + return conf; } @Override - public String[] getFieldNames() { - return StringUtils.getStrings(getString(JobConstants.JOB_ETL_FIELD_NAMES)); + public String getString(String key) { + return conf.get(key); } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/3a88f0b4/core/src/main/java/org/apache/sqoop/job/etl/EtlMutableContext.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/job/etl/EtlMutableContext.java b/core/src/main/java/org/apache/sqoop/job/etl/EtlMutableContext.java index 613b14f..e111956 100644 --- a/core/src/main/java/org/apache/sqoop/job/etl/EtlMutableContext.java +++ b/core/src/main/java/org/apache/sqoop/job/etl/EtlMutableContext.java @@ -18,10 +18,8 @@ package org.apache.sqoop.job.etl; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.util.StringUtils; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.core.CoreError; -import org.apache.sqoop.job.JobConstants; /** * A mutable context used in the ETL framework. @@ -42,9 +40,4 @@ public class EtlMutableContext extends EtlContext implements MutableContext { conf.set(key, value); } - @Override - public void setFieldNames(String[] names) { - setString(JobConstants.JOB_ETL_FIELD_NAMES, StringUtils.arrayToString(names)); - } - } http://git-wip-us.apache.org/repos/asf/sqoop/blob/3a88f0b4/core/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java b/core/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java new file mode 100644 index 0000000..ad513e1 --- /dev/null +++ b/core/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.job.etl; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.core.CoreError; +import org.apache.sqoop.job.JobConstants; +import org.apache.sqoop.job.etl.Loader; +import org.apache.sqoop.job.io.Data; +import org.apache.sqoop.job.io.DataReader; +import org.apache.sqoop.utils.ClassLoadingUtils; + +public class HdfsSequenceImportLoader extends Loader { + + public static final String extension = ".seq"; + + private final char fieldDelimiter; + private final char recordDelimiter; + + public HdfsSequenceImportLoader() { + fieldDelimiter = Data.DEFAULT_FIELD_DELIMITER; + recordDelimiter = Data.DEFAULT_RECORD_DELIMITER; + } + + @Override + public void run(Context context, DataReader reader) { + Configuration conf = ((EtlContext)context).getConfiguration(); + String filename = + context.getString(JobConstants.JOB_MR_OUTPUT_FILE); + String codecname = context.getString(JobConstants.JOB_MR_OUTPUT_CODEC); + + CompressionCodec codec = null; + if (codecname != null) { + Class<?> clz = ClassLoadingUtils.loadClass(codecname); + if (clz == null) { + throw new SqoopException(CoreError.CORE_0009, codecname); + } + + try { + codec = (CompressionCodec) clz.newInstance(); + if (codec instanceof Configurable) { + ((Configurable) codec).setConf(conf); + } + } catch (Exception e) { + throw new SqoopException(CoreError.CORE_0010, codecname, e); + } + } + + filename += extension; + + try { + Path filepath = new Path(filename); + SequenceFile.Writer filewriter; + if (codecname != null) { + filewriter = SequenceFile.createWriter(conf, + SequenceFile.Writer.file(filepath), + SequenceFile.Writer.keyClass(Text.class), + SequenceFile.Writer.valueClass(NullWritable.class), + SequenceFile.Writer.compression(CompressionType.BLOCK, codec)); + } else { + filewriter = SequenceFile.createWriter(conf, + SequenceFile.Writer.file(filepath), + SequenceFile.Writer.keyClass(Text.class), + SequenceFile.Writer.valueClass(NullWritable.class), + SequenceFile.Writer.compression(CompressionType.NONE)); + } + + Object record; + Text text = new Text(); + while ((record = reader.readRecord()) != null) { + text.set(Data.format(record, fieldDelimiter, recordDelimiter)); + filewriter.append(text, NullWritable.get()); + }; + filewriter.close(); + + } catch (IOException e) { + throw new SqoopException(CoreError.CORE_0018, e); + } + + } + +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/3a88f0b4/core/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java b/core/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java new file mode 100644 index 0000000..1368a5e --- /dev/null +++ b/core/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.job.etl; + +import java.io.BufferedWriter; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.core.CoreError; +import org.apache.sqoop.job.JobConstants; +import org.apache.sqoop.job.etl.Loader; +import org.apache.sqoop.job.io.Data; +import org.apache.sqoop.job.io.DataReader; +import org.apache.sqoop.utils.ClassLoadingUtils; + +public class HdfsTextImportLoader extends Loader { + + private final char fieldDelimiter; + private final char recordDelimiter; + + public HdfsTextImportLoader() { + fieldDelimiter = Data.DEFAULT_FIELD_DELIMITER; + recordDelimiter = Data.DEFAULT_RECORD_DELIMITER; + } + + @Override + public void run(Context context, DataReader reader) { + Configuration conf = ((EtlContext)context).getConfiguration(); + String filename = context.getString(JobConstants.JOB_MR_OUTPUT_FILE); + String codecname = context.getString(JobConstants.JOB_MR_OUTPUT_CODEC); + + CompressionCodec codec = null; + if (codecname != null) { + Class<?> clz = ClassLoadingUtils.loadClass(codecname); + if (clz == null) { + throw new SqoopException(CoreError.CORE_0009, codecname); + } + + try { + codec = (CompressionCodec) clz.newInstance(); + if (codec instanceof Configurable) { + ((Configurable) codec).setConf(conf); + } + } catch (Exception e) { + throw new SqoopException(CoreError.CORE_0010, codecname, e); + } + + filename += codec.getDefaultExtension(); + } + + try { + Path filepath = new Path(filename); + FileSystem fs = filepath.getFileSystem(conf); + + BufferedWriter filewriter; + DataOutputStream filestream = fs.create(filepath, false); + if (codecname != null) { + filewriter = new BufferedWriter(new OutputStreamWriter( + codec.createOutputStream(filestream, codec.createCompressor()), + Data.CHARSET_NAME)); + } else { + filewriter = new BufferedWriter(new OutputStreamWriter( + filestream, Data.CHARSET_NAME)); + } + + Object record; + while ((record = reader.readRecord()) != null) { + filewriter.write(Data.format(record, fieldDelimiter, recordDelimiter)); + }; + filewriter.close(); + + } catch (IOException e) { + throw new SqoopException(CoreError.CORE_0018, e); + } + + } + +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/3a88f0b4/core/src/main/java/org/apache/sqoop/job/io/Data.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/job/io/Data.java b/core/src/main/java/org/apache/sqoop/job/io/Data.java index ce5e2aa..2732e83 100644 --- a/core/src/main/java/org/apache/sqoop/job/io/Data.java +++ b/core/src/main/java/org/apache/sqoop/job/io/Data.java @@ -38,13 +38,14 @@ public class Data implements WritableComparable<Data> { // - String for a text of CSV record private Object content = null; - private static final int EMPTY_DATA = 0; - private static final int CSV_RECORD = 1; - private static final int ARRAY_RECORD = 2; + public static final int EMPTY_DATA = 0; + public static final int CSV_RECORD = 1; + public static final int ARRAY_RECORD = 2; private int type = EMPTY_DATA; - private static char FIELD_DELIMITER = ','; - private static char RECORD_DELIMITER = '\n'; + public static final char DEFAULT_FIELD_DELIMITER = ','; + public static final char DEFAULT_RECORD_DELIMITER = '\n'; + public static final String CHARSET_NAME = "UTF-8"; public void setContent(Object content) { if (content == null) { @@ -72,10 +73,46 @@ public class Data implements WritableComparable<Data> { return (type == EMPTY_DATA); } + public static String format(Object content, + char fieldDelimiter, char recordDelimiter) { + if (content instanceof String) { + return (String)content + recordDelimiter; + + } else if (content instanceof Object[]) { + StringBuilder sb = new StringBuilder(); + Object[] array = (Object[])content; + for (int i = 0; i < array.length; i++) { + if (i != 0) { + sb.append(fieldDelimiter); + } + + if (array[i] instanceof String) { + // TODO: Also need to escape those special characters as documented in: + // https://cwiki.apache.org/confluence/display/SQOOP/Sqoop2+Intermediate+representation#Sqoop2Intermediaterepresentation-Intermediateformatrepresentationproposal + sb.append("\'"); + sb.append(((String)array[i]).replaceAll( + "\'", Matcher.quoteReplacement("\\\'"))); + sb.append("\'"); + } else if (array[i] instanceof byte[]) { + sb.append(Arrays.toString((byte[])array[i])); + } else { + sb.append(array[i].toString()); + } + } + sb.append(recordDelimiter); + return sb.toString(); + + } else { + throw new SqoopException(CoreError.CORE_0012, + content.getClass().getName()); + } + } + @Override public int compareTo(Data other) { - byte[] myBytes = toString().getBytes(Charset.forName("UTF-8")); - byte[] otherBytes = other.toString().getBytes(Charset.forName("UTF-8")); + byte[] myBytes = toString().getBytes(Charset.forName(CHARSET_NAME)); + byte[] otherBytes = other.toString().getBytes( + Charset.forName(CHARSET_NAME)); return WritableComparator.compareBytes( myBytes, 0, myBytes.length, otherBytes, 0, otherBytes.length); } @@ -114,33 +151,7 @@ public class Data implements WritableComparable<Data> { @Override public String toString() { - switch (type) { - case CSV_RECORD: - return (String)content + RECORD_DELIMITER; - case ARRAY_RECORD: - StringBuilder sb = new StringBuilder(); - Object[] array = (Object[])content; - for (int i = 0; i < array.length; i++) { - if (i != 0) { - sb.append(FIELD_DELIMITER); - } - - if (array[i] instanceof String) { - sb.append("\'"); - sb.append(((String)array[i]).replaceAll( - "\'", Matcher.quoteReplacement("\\\'"))); - sb.append("\'"); - } else if (array[i] instanceof byte[]) { - sb.append(Arrays.toString((byte[])array[i])); - } else { - sb.append(array[i].toString()); - } - } - sb.append(RECORD_DELIMITER); - return sb.toString(); - default: - throw new SqoopException(CoreError.CORE_0012, String.valueOf(type)); - } + return format(content, DEFAULT_FIELD_DELIMITER, DEFAULT_RECORD_DELIMITER); } @Override http://git-wip-us.apache.org/repos/asf/sqoop/blob/3a88f0b4/core/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java b/core/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java index f4d30f6..c465f10 100644 --- a/core/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java +++ b/core/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java @@ -18,12 +18,19 @@ package org.apache.sqoop.job.mr; +import java.io.IOException; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.DefaultCodec; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.sqoop.job.JobConstants; import org.apache.sqoop.job.io.Data; /** @@ -35,9 +42,25 @@ public class SqoopFileOutputFormat public static final Log LOG = LogFactory.getLog(SqoopFileOutputFormat.class.getName()); + public static final Class<? extends CompressionCodec> DEFAULT_CODEC = + DefaultCodec.class; + @Override public RecordWriter<Data, NullWritable> getRecordWriter( - TaskAttemptContext context) { + TaskAttemptContext context) throws IOException { + Configuration conf = context.getConfiguration(); + + Path filepath = getDefaultWorkFile(context, ""); + String filename = filepath.toString(); + conf.set(JobConstants.JOB_MR_OUTPUT_FILE, filename); + + boolean isCompressed = getCompressOutput(context); + if (isCompressed) { + String codecname = + conf.get(FileOutputFormat.COMPRESS_CODEC, DEFAULT_CODEC.getName()); + conf.set(JobConstants.JOB_MR_OUTPUT_CODEC, codecname); + } + SqoopOutputFormatLoadExecutor executor = new SqoopOutputFormatLoadExecutor(context); return executor.getRecordWriter(); http://git-wip-us.apache.org/repos/asf/sqoop/blob/3a88f0b4/core/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java b/core/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java index 8f1e9a9..eb02271 100644 --- a/core/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java +++ b/core/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java @@ -80,20 +80,21 @@ public class SqoopMapper @Override public void writeArrayRecord(Object[] record) { - writeContent(record); + writeRecord(record); } @Override public void writeCsvRecord(String csv) { - writeContent(csv); + writeRecord(csv); } - private void writeContent(Object content) { + @Override + public void writeRecord(Object record) { if (data == null) { data = new Data(); } - data.setContent(content); + data.setContent(record); try { context.write(data, NullWritable.get()); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/sqoop/blob/3a88f0b4/core/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java b/core/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java index 65a8cdf..71e76ca 100644 --- a/core/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java +++ b/core/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java @@ -127,15 +127,16 @@ public class SqoopOutputFormatLoadExecutor { public class OutputFormatDataReader extends DataReader { @Override public Object[] readArrayRecord() { - return (Object[])readContent(); + return (Object[])readRecord(); } @Override public String readCsvRecord() { - return (String)readContent(); + return (String)readRecord(); } - private Object readContent() { + @Override + public Object readRecord() { synchronized (data) { if (writerFinished) { return null; http://git-wip-us.apache.org/repos/asf/sqoop/blob/3a88f0b4/core/src/test/java/org/apache/sqoop/job/FileUtils.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/sqoop/job/FileUtils.java b/core/src/test/java/org/apache/sqoop/job/FileUtils.java new file mode 100644 index 0000000..4b075d2 --- /dev/null +++ b/core/src/test/java/org/apache/sqoop/job/FileUtils.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.job; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +public class FileUtils { + + public static void delete(String file) throws IOException { + Path path = new Path(file); + FileSystem fs = path.getFileSystem(new Configuration()); + if (fs.exists(path)) { + fs.delete(path, true); + } + } + + private FileUtils() { + // Disable explicit object creation + } + +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/3a88f0b4/core/src/test/java/org/apache/sqoop/job/JobUtils.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/sqoop/job/JobUtils.java b/core/src/test/java/org/apache/sqoop/job/JobUtils.java new file mode 100644 index 0000000..e6ead3f --- /dev/null +++ b/core/src/test/java/org/apache/sqoop/job/JobUtils.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.job; + +import java.io.IOException; + +import junit.framework.Assert; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.sqoop.job.io.Data; +import org.apache.sqoop.job.mr.SqoopFileOutputFormat; +import org.apache.sqoop.job.mr.SqoopInputFormat; +import org.apache.sqoop.job.mr.SqoopMapper; +import org.apache.sqoop.job.mr.SqoopNullOutputFormat; +import org.apache.sqoop.job.mr.SqoopSplit; + +public class JobUtils { + + public static void runJob(Configuration conf) + throws IOException, InterruptedException, ClassNotFoundException { + runJob(conf, SqoopInputFormat.class, SqoopMapper.class, + (conf.get(FileOutputFormat.OUTDIR) != null) ? + SqoopFileOutputFormat.class : SqoopNullOutputFormat.class); + } + + public static void runJob(Configuration conf, + Class<? extends InputFormat<SqoopSplit, NullWritable>> input, + Class<? extends Mapper<SqoopSplit, NullWritable, Data, NullWritable>> mapper, + Class<? extends OutputFormat<Data, NullWritable>> output) + throws IOException, InterruptedException, ClassNotFoundException { + Job job = Job.getInstance(conf); + job.setInputFormatClass(input); + job.setMapperClass(mapper); + job.setMapOutputKeyClass(Data.class); + job.setMapOutputValueClass(NullWritable.class); + job.setOutputFormatClass(output); + job.setOutputKeyClass(Data.class); + job.setOutputValueClass(NullWritable.class); + + boolean success = job.waitForCompletion(true); + Assert.assertEquals("Job failed!", true, success); + } + + private JobUtils() { + // Disable explicit object creation + } + +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/3a88f0b4/core/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java b/core/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java new file mode 100644 index 0000000..ab05c8e --- /dev/null +++ b/core/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java @@ -0,0 +1,233 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.job; + +import java.io.BufferedReader; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.LinkedList; +import java.util.List; + +import junit.framework.TestCase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.sqoop.job.etl.Context; +import org.apache.sqoop.job.etl.Extractor; +import org.apache.sqoop.job.etl.HdfsSequenceImportLoader; +import org.apache.sqoop.job.etl.HdfsTextImportLoader; +import org.apache.sqoop.job.etl.Partition; +import org.apache.sqoop.job.etl.Partitioner; +import org.apache.sqoop.job.io.Data; +import org.apache.sqoop.job.io.DataWriter; +import org.apache.sqoop.job.mr.SqoopFileOutputFormat; +import org.junit.Test; + +public class TestHdfsLoad extends TestCase { + + private static final String OUTPUT_ROOT = "/tmp/sqoop/warehouse/"; + private static final String OUTPUT_FILE = "part-r-00000"; + private static final int START_ID = 1; + private static final int NUMBER_OF_IDS = 9; + private static final int NUMBER_OF_ROWS_PER_ID = 10; + + private String outdir; + + public TestHdfsLoad() { + outdir = OUTPUT_ROOT + "/" + getClass().getSimpleName(); + } + + @Test + public void testUncompressedText() throws Exception { + FileUtils.delete(outdir); + + Configuration conf = new Configuration(); + conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); + conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); + conf.set(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName()); + conf.set(FileOutputFormat.OUTDIR, outdir); + JobUtils.runJob(conf); + + Path filepath = new Path(outdir, OUTPUT_FILE); + FileSystem fs = filepath.getFileSystem(conf); + DataInputStream filestream = new DataInputStream(fs.open(filepath)); + BufferedReader filereader = new BufferedReader(new InputStreamReader( + filestream, Data.CHARSET_NAME)); + verifyOutputText(filereader); + } + + @Test + public void testCompressedText() throws Exception { + FileUtils.delete(outdir); + + Configuration conf = new Configuration(); + conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); + conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); + conf.set(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName()); + conf.set(FileOutputFormat.OUTDIR, outdir); + conf.setBoolean(FileOutputFormat.COMPRESS, true); + JobUtils.runJob(conf); + + Class<? extends CompressionCodec> codecClass = conf.getClass( + FileOutputFormat.COMPRESS_CODEC, SqoopFileOutputFormat.DEFAULT_CODEC) + .asSubclass(CompressionCodec.class); + CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf); + Path filepath = new Path(outdir, + OUTPUT_FILE + codec.getDefaultExtension()); + FileSystem fs = filepath.getFileSystem(conf); + InputStream filestream = codec.createInputStream(fs.open(filepath)); + BufferedReader filereader = new BufferedReader(new InputStreamReader( + filestream, Data.CHARSET_NAME)); + verifyOutputText(filereader); + } + + private void verifyOutputText(BufferedReader reader) throws IOException { + String line = null; + int index = START_ID*NUMBER_OF_ROWS_PER_ID; + String expected; + while ((line = reader.readLine()) != null){ + expected = Data.format( + new Object[] {String.valueOf(index), new Integer(index), new Double(index)}, + Data.DEFAULT_FIELD_DELIMITER, Data.DEFAULT_RECORD_DELIMITER); + index++; + + assertEquals(expected.toString(), + line + Data.DEFAULT_RECORD_DELIMITER); + } + reader.close(); + + assertEquals(NUMBER_OF_IDS*NUMBER_OF_ROWS_PER_ID, + index-START_ID*NUMBER_OF_ROWS_PER_ID); + } + + @Test + public void testUncompressedSequence() throws Exception { + FileUtils.delete(outdir); + + Configuration conf = new Configuration(); + conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); + conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); + conf.set(JobConstants.JOB_ETL_LOADER, HdfsSequenceImportLoader.class.getName()); + conf.set(FileOutputFormat.OUTDIR, outdir); + JobUtils.runJob(conf); + + Path filepath = new Path(outdir, + OUTPUT_FILE + HdfsSequenceImportLoader.extension); + SequenceFile.Reader filereader = new SequenceFile.Reader(conf, + SequenceFile.Reader.file(filepath)); + verifyOutputSequence(filereader); + } + + @Test + public void testCompressedSequence() throws Exception { + FileUtils.delete(outdir); + + Configuration conf = new Configuration(); + conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); + conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); + conf.set(JobConstants.JOB_ETL_LOADER, HdfsSequenceImportLoader.class.getName()); + conf.set(FileOutputFormat.OUTDIR, outdir); + conf.setBoolean(FileOutputFormat.COMPRESS, true); + JobUtils.runJob(conf); + + Path filepath = new Path(outdir, + OUTPUT_FILE + HdfsSequenceImportLoader.extension); + SequenceFile.Reader filereader = new SequenceFile.Reader(conf, + SequenceFile.Reader.file(filepath)); + verifyOutputSequence(filereader); + } + + private void verifyOutputSequence(SequenceFile.Reader reader) throws IOException { + int index = START_ID*NUMBER_OF_ROWS_PER_ID; + Text expected = new Text(); + Text actual = new Text(); + while (reader.next(actual)){ + expected.set(Data.format( + new Object[] {String.valueOf(index), new Integer(index), new Double(index)}, + Data.DEFAULT_FIELD_DELIMITER, Data.DEFAULT_RECORD_DELIMITER)); + index++; + + assertEquals(expected.toString(), actual.toString()); + } + reader.close(); + + assertEquals(NUMBER_OF_IDS*NUMBER_OF_ROWS_PER_ID, + index-START_ID*NUMBER_OF_ROWS_PER_ID); + } + + public static class DummyPartition extends Partition { + private int id; + + public void setId(int id) { + this.id = id; + } + + public int getId() { + return id; + } + + @Override + public void readFields(DataInput in) throws IOException { + id = in.readInt(); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(id); + } + } + + public static class DummyPartitioner extends Partitioner { + @Override + public List<Partition> run(Context context) { + List<Partition> partitions = new LinkedList<Partition>(); + for (int id = START_ID; id <= NUMBER_OF_IDS; id++) { + DummyPartition partition = new DummyPartition(); + partition.setId(id); + partitions.add(partition); + } + return partitions; + } + } + + public static class DummyExtractor extends Extractor { + @Override + public void run(Context context, Partition partition, DataWriter writer) { + int id = ((DummyPartition)partition).getId(); + for (int row = 0; row < NUMBER_OF_ROWS_PER_ID; row++) { + Object[] array = new Object[] { + String.valueOf(id*NUMBER_OF_ROWS_PER_ID+row), + new Integer(id*NUMBER_OF_ROWS_PER_ID+row), + new Double(id*NUMBER_OF_ROWS_PER_ID+row) + }; + writer.writeArrayRecord(array); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/3a88f0b4/core/src/test/java/org/apache/sqoop/job/TestMapReduce.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/sqoop/job/TestMapReduce.java b/core/src/test/java/org/apache/sqoop/job/TestMapReduce.java index 8158e46..f4701db 100644 --- a/core/src/test/java/org/apache/sqoop/job/TestMapReduce.java +++ b/core/src/test/java/org/apache/sqoop/job/TestMapReduce.java @@ -23,7 +23,7 @@ import java.io.IOException; import java.util.LinkedList; import java.util.List; -import junit.framework.Assert; +import junit.framework.TestCase; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; @@ -48,7 +48,7 @@ import org.apache.sqoop.job.mr.SqoopNullOutputFormat; import org.apache.sqoop.job.mr.SqoopSplit; import org.junit.Test; -public class TestMapReduce { +public class TestMapReduce extends TestCase { private static final int START_ID = 1; private static final int NUMBER_OF_IDS = 9; @@ -62,12 +62,12 @@ public class TestMapReduce { SqoopInputFormat inputformat = new SqoopInputFormat(); List<InputSplit> splits = inputformat.getSplits(job); - Assert.assertEquals(9, splits.size()); + assertEquals(9, splits.size()); for (int id = START_ID; id <= NUMBER_OF_IDS; id++) { SqoopSplit split = (SqoopSplit)splits.get(id-1); DummyPartition partition = (DummyPartition)split.getPartition(); - Assert.assertEquals(id, partition.getId()); + assertEquals(id, partition.getId()); } } @@ -77,17 +77,8 @@ public class TestMapReduce { conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); - Job job = Job.getInstance(conf); - job.setInputFormatClass(SqoopInputFormat.class); - job.setMapperClass(SqoopMapper.class); - job.setMapOutputKeyClass(Data.class); - job.setMapOutputValueClass(NullWritable.class); - job.setOutputFormatClass(DummyOutputFormat.class); - job.setOutputKeyClass(Data.class); - job.setOutputValueClass(NullWritable.class); - - boolean success = job.waitForCompletion(true); - Assert.assertEquals("Job failed!", true, success); + JobUtils.runJob(conf, SqoopInputFormat.class, SqoopMapper.class, + DummyOutputFormat.class); } @Test @@ -97,17 +88,8 @@ public class TestMapReduce { conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName()); - Job job = Job.getInstance(conf); - job.setInputFormatClass(SqoopInputFormat.class); - job.setMapperClass(SqoopMapper.class); - job.setMapOutputKeyClass(Data.class); - job.setMapOutputValueClass(NullWritable.class); - job.setOutputFormatClass(SqoopNullOutputFormat.class); - job.setOutputKeyClass(Data.class); - job.setOutputValueClass(NullWritable.class); - - boolean success = job.waitForCompletion(true); - Assert.assertEquals("Job failed!", true, success); + JobUtils.runJob(conf, SqoopInputFormat.class, SqoopMapper.class, + SqoopNullOutputFormat.class); } public static class DummyPartition extends Partition { @@ -150,12 +132,10 @@ public class TestMapReduce { public void run(Context context, Partition partition, DataWriter writer) { int id = ((DummyPartition)partition).getId(); for (int row = 0; row < NUMBER_OF_ROWS_PER_ID; row++) { - Object[] array = new Object[] { - String.valueOf(id*NUMBER_OF_ROWS_PER_ID+row), - new Integer(id*NUMBER_OF_ROWS_PER_ID+row), - new Double(id*NUMBER_OF_ROWS_PER_ID+row) - }; - writer.writeArrayRecord(array); + writer.writeArrayRecord(new Object[] { + String.valueOf(id*NUMBER_OF_ROWS_PER_ID+row), + new Integer(id*NUMBER_OF_ROWS_PER_ID+row), + new Double(id*NUMBER_OF_ROWS_PER_ID+row)}); } } } @@ -185,14 +165,13 @@ public class TestMapReduce { @Override public void write(Data key, NullWritable value) { - Object[] record = new Object[] { + data.setContent(new Object[] { String.valueOf(index), new Integer(index), - new Double(index) - }; - data.setContent(record); - Assert.assertEquals(data.toString(), key.toString()); + new Double(index)}); index++; + + assertEquals(data.toString(), key.toString()); } @Override @@ -238,7 +217,7 @@ public class TestMapReduce { new Double(index)}); index++; - Assert.assertEquals(expected.toString(), actual.toString()); + assertEquals(expected.toString(), actual.toString()); }; } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/3a88f0b4/spi/src/main/java/org/apache/sqoop/job/etl/Context.java ---------------------------------------------------------------------- diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Context.java b/spi/src/main/java/org/apache/sqoop/job/etl/Context.java index d2c7ebc..7256281 100644 --- a/spi/src/main/java/org/apache/sqoop/job/etl/Context.java +++ b/spi/src/main/java/org/apache/sqoop/job/etl/Context.java @@ -24,6 +24,4 @@ public interface Context { public String getString(String key); - public String[] getFieldNames(); - } http://git-wip-us.apache.org/repos/asf/sqoop/blob/3a88f0b4/spi/src/main/java/org/apache/sqoop/job/etl/MutableContext.java ---------------------------------------------------------------------- diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/MutableContext.java b/spi/src/main/java/org/apache/sqoop/job/etl/MutableContext.java index c321d74..2bfe89f 100644 --- a/spi/src/main/java/org/apache/sqoop/job/etl/MutableContext.java +++ b/spi/src/main/java/org/apache/sqoop/job/etl/MutableContext.java @@ -24,6 +24,4 @@ public interface MutableContext extends Context { public void setString(String key, String value); - public void setFieldNames(String[] names); - } http://git-wip-us.apache.org/repos/asf/sqoop/blob/3a88f0b4/spi/src/main/java/org/apache/sqoop/job/io/DataReader.java ---------------------------------------------------------------------- diff --git a/spi/src/main/java/org/apache/sqoop/job/io/DataReader.java b/spi/src/main/java/org/apache/sqoop/job/io/DataReader.java index 6b90cda..b9b2f49 100644 --- a/spi/src/main/java/org/apache/sqoop/job/io/DataReader.java +++ b/spi/src/main/java/org/apache/sqoop/job/io/DataReader.java @@ -27,4 +27,6 @@ public abstract class DataReader { public abstract String readCsvRecord(); + public abstract Object readRecord(); + } http://git-wip-us.apache.org/repos/asf/sqoop/blob/3a88f0b4/spi/src/main/java/org/apache/sqoop/job/io/DataWriter.java ---------------------------------------------------------------------- diff --git a/spi/src/main/java/org/apache/sqoop/job/io/DataWriter.java b/spi/src/main/java/org/apache/sqoop/job/io/DataWriter.java index ca55b2e..29c4283 100644 --- a/spi/src/main/java/org/apache/sqoop/job/io/DataWriter.java +++ b/spi/src/main/java/org/apache/sqoop/job/io/DataWriter.java @@ -27,4 +27,6 @@ public abstract class DataWriter { public abstract void writeCsvRecord(String csv); + public abstract void writeRecord(Object record); + }
