Updated Branches: refs/heads/sqoop2 b61de724e -> 27aa78679
SQOOP-533: Intermediate data format support for import (Bilung Lee via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/27aa7867 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/27aa7867 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/27aa7867 Branch: refs/heads/sqoop2 Commit: 27aa786793887623af655774c7fc4022590e0967 Parents: b61de72 Author: Jarek Jarcec Cecho <[email protected]> Authored: Mon Oct 15 15:07:38 2012 -0700 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Mon Oct 15 15:07:38 2012 -0700 ---------------------------------------------------------------------- .../sqoop/connector/jdbc/TestImportExtractor.java | 7 +- .../sqoop/job/etl/HdfsSequenceImportLoader.java | 16 +- .../apache/sqoop/job/etl/HdfsTextImportLoader.java | 10 +- .../main/java/org/apache/sqoop/job/io/Data.java | 157 ++++++++++----- .../java/org/apache/sqoop/job/mr/SqoopMapper.java | 19 ++- .../job/mr/SqoopOutputFormatLoadExecutor.java | 18 +- .../test/java/org/apache/sqoop/io/TestData.java | 76 +++++++ .../test/java/org/apache/sqoop/job/FileUtils.java | 29 +++ .../java/org/apache/sqoop/job/TestHdfsLoad.java | 48 ++--- .../java/org/apache/sqoop/job/TestMapReduce.java | 34 ++-- .../java/org/apache/sqoop/job/io/DataReader.java | 4 +- .../java/org/apache/sqoop/job/io/DataWriter.java | 4 +- 12 files changed, 302 insertions(+), 120 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/27aa7867/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java index 519286b..70e29e5 100644 --- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java +++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java @@ -144,6 +144,11 @@ public class TestImportExtractor extends TestCase { int indx = START; @Override + public void setFieldDelimiter(char fieldDelimiter) { + // do nothing and use default delimiter + } + + @Override public void writeArrayRecord(Object[] array) { for (int i = 0; i < array.length; i++) { if (array[i] instanceof Integer) { @@ -163,7 +168,7 @@ public class TestImportExtractor extends TestCase { } @Override - public void writeRecord(Object record) { + public void writeContent(Object content, int type) { fail("This method should not be invoked."); } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/27aa7867/core/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java b/core/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java index 8802cbc..854d325 100644 --- a/core/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java +++ b/core/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java @@ -37,18 +37,18 @@ import org.apache.sqoop.utils.ClassLoadingUtils; public class HdfsSequenceImportLoader extends Loader { - public static final String extension = ".seq"; + public static final String EXTENSION = ".seq"; private final char fieldDelimiter; - private final char recordDelimiter; public HdfsSequenceImportLoader() { fieldDelimiter = Data.DEFAULT_FIELD_DELIMITER; - recordDelimiter = Data.DEFAULT_RECORD_DELIMITER; } @Override public void run(Context context, DataReader reader) { + reader.setFieldDelimiter(fieldDelimiter); + Configuration conf = ((EtlContext)context).getConfiguration(); String filename = context.getString(JobConstants.JOB_MR_OUTPUT_FILE); @@ -71,12 +71,12 @@ public class HdfsSequenceImportLoader extends Loader { } } - filename += extension; + filename += EXTENSION; try { Path filepath = new Path(filename); SequenceFile.Writer filewriter; - if (codecname != null) { + if (codec != null) { filewriter = SequenceFile.createWriter(conf, SequenceFile.Writer.file(filepath), SequenceFile.Writer.keyClass(Text.class), @@ -90,10 +90,10 @@ public class HdfsSequenceImportLoader extends Loader { SequenceFile.Writer.compression(CompressionType.NONE)); } - Object record; + String csv; Text text = new Text(); - while ((record = reader.readRecord()) != null) { - text.set(Data.format(record, fieldDelimiter, recordDelimiter)); + while ((csv = reader.readCsvRecord()) != null) { + text.set(csv); filewriter.append(text, NullWritable.get()); } filewriter.close(); http://git-wip-us.apache.org/repos/asf/sqoop/blob/27aa7867/core/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java b/core/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java index b1ee255..240265b 100644 --- a/core/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java +++ b/core/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java @@ -47,6 +47,8 @@ public class HdfsTextImportLoader extends Loader { @Override public void run(Context context, DataReader reader) { + reader.setFieldDelimiter(fieldDelimiter); + Configuration conf = ((EtlContext)context).getConfiguration(); String filename = context.getString(JobConstants.JOB_MR_OUTPUT_FILE); String codecname = context.getString(JobConstants.JOB_MR_OUTPUT_CODEC); @@ -76,7 +78,7 @@ public class HdfsTextImportLoader extends Loader { BufferedWriter filewriter; DataOutputStream filestream = fs.create(filepath, false); - if (codecname != null) { + if (codec != null) { filewriter = new BufferedWriter(new OutputStreamWriter( codec.createOutputStream(filestream, codec.createCompressor()), Data.CHARSET_NAME)); @@ -85,9 +87,9 @@ public class HdfsTextImportLoader extends Loader { filestream, Data.CHARSET_NAME)); } - Object record; - while ((record = reader.readRecord()) != null) { - filewriter.write(Data.format(record, fieldDelimiter, recordDelimiter)); + String csv; + while ((csv = reader.readCsvRecord()) != null) { + filewriter.write(csv + recordDelimiter); } filewriter.close(); http://git-wip-us.apache.org/repos/asf/sqoop/blob/27aa7867/core/src/main/java/org/apache/sqoop/job/io/Data.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/job/io/Data.java b/core/src/main/java/org/apache/sqoop/job/io/Data.java index 2732e83..4ddd132 100644 --- a/core/src/main/java/org/apache/sqoop/job/io/Data.java +++ b/core/src/main/java/org/apache/sqoop/job/io/Data.java @@ -21,6 +21,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.nio.charset.Charset; +import java.util.ArrayList; import java.util.Arrays; import java.util.regex.Matcher; @@ -43,26 +44,45 @@ public class Data implements WritableComparable<Data> { public static final int ARRAY_RECORD = 2; private int type = EMPTY_DATA; - public static final char DEFAULT_FIELD_DELIMITER = ','; - public static final char DEFAULT_RECORD_DELIMITER = '\n'; public static final String CHARSET_NAME = "UTF-8"; - public void setContent(Object content) { - if (content == null) { - this.type = EMPTY_DATA; - } else if (content instanceof String) { - this.type = CSV_RECORD; - } else if (content instanceof Object[]) { - this.type = ARRAY_RECORD; - } else { - throw new SqoopException(CoreError.CORE_0012, - content.getClass().getName()); + public static final char DEFAULT_RECORD_DELIMITER = '\n'; + public static final char DEFAULT_FIELD_DELIMITER = ','; + public static final char DEFAULT_STRING_DELIMITER = '\''; + public static final char DEFAULT_STRING_ESCAPE = '\\'; + private char fieldDelimiter = DEFAULT_FIELD_DELIMITER; + private char stringDelimiter = DEFAULT_STRING_DELIMITER; + private char stringEscape = DEFAULT_STRING_ESCAPE; + private String escapedStringDelimiter = String.valueOf(new char[] { + stringEscape, stringDelimiter + }); + + public void setFieldDelimiter(char fieldDelimiter) { + this.fieldDelimiter = fieldDelimiter; + } + + public void setContent(Object content, int type) { + switch (type) { + case EMPTY_DATA: + case CSV_RECORD: + case ARRAY_RECORD: + this.type = type; + this.content = content; + break; + default: + throw new SqoopException(CoreError.CORE_0012, String.valueOf(type)); } - this.content = content; } - public Object getContent() { - return content; + public Object getContent(int targetType) { + switch (targetType) { + case CSV_RECORD: + return format(); + case ARRAY_RECORD: + return parse(); + default: + throw new SqoopException(CoreError.CORE_0012, String.valueOf(targetType)); + } } public int getType() { @@ -73,39 +93,9 @@ public class Data implements WritableComparable<Data> { return (type == EMPTY_DATA); } - public static String format(Object content, - char fieldDelimiter, char recordDelimiter) { - if (content instanceof String) { - return (String)content + recordDelimiter; - - } else if (content instanceof Object[]) { - StringBuilder sb = new StringBuilder(); - Object[] array = (Object[])content; - for (int i = 0; i < array.length; i++) { - if (i != 0) { - sb.append(fieldDelimiter); - } - - if (array[i] instanceof String) { - // TODO: Also need to escape those special characters as documented in: - // https://cwiki.apache.org/confluence/display/SQOOP/Sqoop2+Intermediate+representation#Sqoop2Intermediaterepresentation-Intermediateformatrepresentationproposal - sb.append("\'"); - sb.append(((String)array[i]).replaceAll( - "\'", Matcher.quoteReplacement("\\\'"))); - sb.append("\'"); - } else if (array[i] instanceof byte[]) { - sb.append(Arrays.toString((byte[])array[i])); - } else { - sb.append(array[i].toString()); - } - } - sb.append(recordDelimiter); - return sb.toString(); - - } else { - throw new SqoopException(CoreError.CORE_0012, - content.getClass().getName()); - } + @Override + public String toString() { + return (String)getContent(CSV_RECORD); } @Override @@ -150,11 +140,6 @@ public class Data implements WritableComparable<Data> { } @Override - public String toString() { - return format(content, DEFAULT_FIELD_DELIMITER, DEFAULT_RECORD_DELIMITER); - } - - @Override public void readFields(DataInput in) throws IOException { type = readType(in); switch (type) { @@ -324,4 +309,70 @@ public class Data implements WritableComparable<Data> { } } + private String format() { + switch (type) { + case EMPTY_DATA: + return null; + + case CSV_RECORD: + if (fieldDelimiter == DEFAULT_FIELD_DELIMITER) { + return (String)content; + } else { + // TODO: need to exclude the case where comma is part of a string. + return ((String)content).replaceAll( + String.valueOf(DEFAULT_FIELD_DELIMITER), + String.valueOf(fieldDelimiter)); + } + + case ARRAY_RECORD: + StringBuilder sb = new StringBuilder(); + Object[] array = (Object[])content; + for (int i = 0; i < array.length; i++) { + if (i != 0) { + sb.append(fieldDelimiter); + } + + if (array[i] instanceof String) { + sb.append(stringDelimiter); + sb.append(escape((String)array[i])); + sb.append(stringDelimiter); + } else if (array[i] instanceof byte[]) { + sb.append(Arrays.toString((byte[])array[i])); + } else { + sb.append(String.valueOf(array[i])); + } + } + return sb.toString(); + + default: + throw new SqoopException(CoreError.CORE_0012, String.valueOf(type)); + } + } + + private Object[] parse() { + switch (type) { + case EMPTY_DATA: + return null; + + case CSV_RECORD: + ArrayList<Object> list = new ArrayList<Object>(); + // todo: need to parse CSV into Array + return list.toArray(); + + case ARRAY_RECORD: + return (Object[])content; + + default: + throw new SqoopException(CoreError.CORE_0012, String.valueOf(type)); + } + } + + private String escape(String string) { + // TODO: Also need to escape those special characters as documented in: + // https://cwiki.apache.org/confluence/display/SQOOP/Sqoop2+Intermediate+representation#Sqoop2Intermediaterepresentation-Intermediateformatrepresentationproposal + String regex = String.valueOf(stringDelimiter); + String replacement = Matcher.quoteReplacement(escapedStringDelimiter); + return string.replaceAll(regex, replacement); + } + } http://git-wip-us.apache.org/repos/asf/sqoop/blob/27aa7867/core/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java b/core/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java index eb02271..0a9f46d 100644 --- a/core/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java +++ b/core/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java @@ -79,22 +79,31 @@ public class SqoopMapper } @Override - public void writeArrayRecord(Object[] record) { - writeRecord(record); + public void setFieldDelimiter(char fieldDelimiter) { + if (data == null) { + data = new Data(); + } + + data.setFieldDelimiter(fieldDelimiter); + } + + @Override + public void writeArrayRecord(Object[] array) { + writeContent(array, Data.ARRAY_RECORD); } @Override public void writeCsvRecord(String csv) { - writeRecord(csv); + writeContent(csv, Data.CSV_RECORD); } @Override - public void writeRecord(Object record) { + public void writeContent(Object content, int type) { if (data == null) { data = new Data(); } - data.setContent(record); + data.setContent(content, type); try { context.write(data, NullWritable.get()); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/sqoop/blob/27aa7867/core/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java b/core/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java index 71e76ca..23fcb62 100644 --- a/core/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java +++ b/core/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java @@ -74,7 +74,8 @@ public class SqoopOutputFormatLoadExecutor { data.wait(); } - data.setContent(key.getContent()); + int type = key.getType(); + data.setContent(key.getContent(type), type); // notify reader that the data is ready data.notify(); @@ -126,17 +127,22 @@ public class SqoopOutputFormatLoadExecutor { public class OutputFormatDataReader extends DataReader { @Override + public void setFieldDelimiter(char fieldDelimiter) { + data.setFieldDelimiter(fieldDelimiter); + } + + @Override public Object[] readArrayRecord() { - return (Object[])readRecord(); + return (Object[])readContent(Data.ARRAY_RECORD); } @Override public String readCsvRecord() { - return (String)readRecord(); + return (String)readContent(Data.CSV_RECORD); } @Override - public Object readRecord() { + public Object readContent(int type) { synchronized (data) { if (writerFinished) { return null; @@ -148,8 +154,8 @@ public class SqoopOutputFormatLoadExecutor { data.wait(); } - Object content = data.getContent(); - data.setContent(null); + Object content = data.getContent(type); + data.setContent(null, Data.EMPTY_DATA); // notify writer that data is consumed data.notify(); http://git-wip-us.apache.org/repos/asf/sqoop/blob/27aa7867/core/src/test/java/org/apache/sqoop/io/TestData.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/sqoop/io/TestData.java b/core/src/test/java/org/apache/sqoop/io/TestData.java new file mode 100644 index 0000000..9fe9d41 --- /dev/null +++ b/core/src/test/java/org/apache/sqoop/io/TestData.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.io; + +import java.util.Arrays; + +import junit.framework.TestCase; + +import org.apache.sqoop.job.io.Data; +import org.junit.Test; + +public class TestData extends TestCase { + + private static final double TEST_NUMBER = Math.PI + 100; + @Test + public void testArrayToCsv() throws Exception { + Data data = new Data(); + String expected; + String actual; + + // with special characters: + expected = + Long.valueOf((long)TEST_NUMBER) + "," + + Double.valueOf(TEST_NUMBER) + "," + + "'" + String.valueOf(TEST_NUMBER) + "\\',s'" + "," + + Arrays.toString(new byte[] {1, 2, 3, 4, 5}); + data.setContent(new Object[] { + Long.valueOf((long)TEST_NUMBER), + Double.valueOf(TEST_NUMBER), + String.valueOf(TEST_NUMBER) + "',s", + new byte[] {1, 2, 3, 4, 5} }, + Data.ARRAY_RECORD); + actual = (String)data.getContent(Data.CSV_RECORD); + assertEquals(expected, actual); + + // with null characters: + expected = + Long.valueOf((long)TEST_NUMBER) + "," + + Double.valueOf(TEST_NUMBER) + "," + + "null" + "," + + Arrays.toString(new byte[] {1, 2, 3, 4, 5}); + data.setContent(new Object[] { + Long.valueOf((long)TEST_NUMBER), + Double.valueOf(TEST_NUMBER), + null, + new byte[] {1, 2, 3, 4, 5} }, + Data.ARRAY_RECORD); + actual = (String)data.getContent(Data.CSV_RECORD); + assertEquals(expected, actual); + } + + public static void assertEquals(Object expected, Object actual) { + if (expected instanceof byte[]) { + assertEquals(Arrays.toString((byte[])expected), + Arrays.toString((byte[])actual)); + } else { + TestCase.assertEquals(expected, actual); + } + } + +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/27aa7867/core/src/test/java/org/apache/sqoop/job/FileUtils.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/sqoop/job/FileUtils.java b/core/src/test/java/org/apache/sqoop/job/FileUtils.java index 4b075d2..e685883 100644 --- a/core/src/test/java/org/apache/sqoop/job/FileUtils.java +++ b/core/src/test/java/org/apache/sqoop/job/FileUtils.java @@ -18,6 +18,8 @@ package org.apache.sqoop.job; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -25,6 +27,12 @@ import org.apache.hadoop.fs.Path; public class FileUtils { + public static boolean exists(String file) throws IOException { + Path path = new Path(file); + FileSystem fs = path.getFileSystem(new Configuration()); + return fs.exists(path); + } + public static void delete(String file) throws IOException { Path path = new Path(file); FileSystem fs = path.getFileSystem(new Configuration()); @@ -33,6 +41,27 @@ public class FileUtils { } } + public static void mkdirs(String directory) throws IOException { + Path path = new Path(directory); + FileSystem fs = path.getFileSystem(new Configuration()); + if (!fs.exists(path)) { + fs.mkdirs(path); + } + } + + public static InputStream open(String fileName) + throws IOException, ClassNotFoundException { + Path filepath = new Path(fileName); + FileSystem fs = filepath.getFileSystem(new Configuration()); + return fs.open(filepath); + } + + public static OutputStream create(String fileName) throws IOException { + Path filepath = new Path(fileName); + FileSystem fs = filepath.getFileSystem(new Configuration()); + return fs.create(filepath, false); + } + private FileUtils() { // Disable explicit object creation } http://git-wip-us.apache.org/repos/asf/sqoop/blob/27aa7867/core/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java b/core/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java index ab05c8e..64c767c 100644 --- a/core/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java +++ b/core/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java @@ -19,7 +19,6 @@ package org.apache.sqoop.job; import java.io.BufferedReader; import java.io.DataInput; -import java.io.DataInputStream; import java.io.DataOutput; import java.io.IOException; import java.io.InputStream; @@ -30,7 +29,6 @@ import java.util.List; import junit.framework.TestCase; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; @@ -73,9 +71,8 @@ public class TestHdfsLoad extends TestCase { conf.set(FileOutputFormat.OUTDIR, outdir); JobUtils.runJob(conf); - Path filepath = new Path(outdir, OUTPUT_FILE); - FileSystem fs = filepath.getFileSystem(conf); - DataInputStream filestream = new DataInputStream(fs.open(filepath)); + String fileName = outdir + "/" + OUTPUT_FILE; + InputStream filestream = FileUtils.open(fileName); BufferedReader filereader = new BufferedReader(new InputStreamReader( filestream, Data.CHARSET_NAME)); verifyOutputText(filereader); @@ -97,27 +94,26 @@ public class TestHdfsLoad extends TestCase { FileOutputFormat.COMPRESS_CODEC, SqoopFileOutputFormat.DEFAULT_CODEC) .asSubclass(CompressionCodec.class); CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf); - Path filepath = new Path(outdir, - OUTPUT_FILE + codec.getDefaultExtension()); - FileSystem fs = filepath.getFileSystem(conf); - InputStream filestream = codec.createInputStream(fs.open(filepath)); + String fileName = outdir + "/" + OUTPUT_FILE + codec.getDefaultExtension(); + InputStream filestream = codec.createInputStream(FileUtils.open(fileName)); BufferedReader filereader = new BufferedReader(new InputStreamReader( filestream, Data.CHARSET_NAME)); verifyOutputText(filereader); } private void verifyOutputText(BufferedReader reader) throws IOException { - String line = null; - int index = START_ID*NUMBER_OF_ROWS_PER_ID; + String actual = null; String expected; - while ((line = reader.readLine()) != null){ - expected = Data.format( - new Object[] {String.valueOf(index), new Integer(index), new Double(index)}, - Data.DEFAULT_FIELD_DELIMITER, Data.DEFAULT_RECORD_DELIMITER); + Data data = new Data(); + int index = START_ID*NUMBER_OF_ROWS_PER_ID; + while ((actual = reader.readLine()) != null){ + data.setContent(new Object[] { + new Integer(index), new Double(index), String.valueOf(index) }, + Data.ARRAY_RECORD); + expected = data.toString(); index++; - assertEquals(expected.toString(), - line + Data.DEFAULT_RECORD_DELIMITER); + assertEquals(expected, actual); } reader.close(); @@ -137,7 +133,7 @@ public class TestHdfsLoad extends TestCase { JobUtils.runJob(conf); Path filepath = new Path(outdir, - OUTPUT_FILE + HdfsSequenceImportLoader.extension); + OUTPUT_FILE + HdfsSequenceImportLoader.EXTENSION); SequenceFile.Reader filereader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(filepath)); verifyOutputSequence(filereader); @@ -156,7 +152,7 @@ public class TestHdfsLoad extends TestCase { JobUtils.runJob(conf); Path filepath = new Path(outdir, - OUTPUT_FILE + HdfsSequenceImportLoader.extension); + OUTPUT_FILE + HdfsSequenceImportLoader.EXTENSION); SequenceFile.Reader filereader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(filepath)); verifyOutputSequence(filereader); @@ -164,12 +160,14 @@ public class TestHdfsLoad extends TestCase { private void verifyOutputSequence(SequenceFile.Reader reader) throws IOException { int index = START_ID*NUMBER_OF_ROWS_PER_ID; - Text expected = new Text(); Text actual = new Text(); + Text expected = new Text(); + Data data = new Data(); while (reader.next(actual)){ - expected.set(Data.format( - new Object[] {String.valueOf(index), new Integer(index), new Double(index)}, - Data.DEFAULT_FIELD_DELIMITER, Data.DEFAULT_RECORD_DELIMITER)); + data.setContent(new Object[] { + new Integer(index), new Double(index), String.valueOf(index) }, + Data.ARRAY_RECORD); + expected.set(data.toString()); index++; assertEquals(expected.toString(), actual.toString()); @@ -221,9 +219,9 @@ public class TestHdfsLoad extends TestCase { int id = ((DummyPartition)partition).getId(); for (int row = 0; row < NUMBER_OF_ROWS_PER_ID; row++) { Object[] array = new Object[] { - String.valueOf(id*NUMBER_OF_ROWS_PER_ID+row), new Integer(id*NUMBER_OF_ROWS_PER_ID+row), - new Double(id*NUMBER_OF_ROWS_PER_ID+row) + new Double(id*NUMBER_OF_ROWS_PER_ID+row), + String.valueOf(id*NUMBER_OF_ROWS_PER_ID+row) }; writer.writeArrayRecord(array); } http://git-wip-us.apache.org/repos/asf/sqoop/blob/27aa7867/core/src/test/java/org/apache/sqoop/job/TestMapReduce.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/sqoop/job/TestMapReduce.java b/core/src/test/java/org/apache/sqoop/job/TestMapReduce.java index f4701db..7646f57 100644 --- a/core/src/test/java/org/apache/sqoop/job/TestMapReduce.java +++ b/core/src/test/java/org/apache/sqoop/job/TestMapReduce.java @@ -50,9 +50,9 @@ import org.junit.Test; public class TestMapReduce extends TestCase { - private static final int START_ID = 1; - private static final int NUMBER_OF_IDS = 9; - private static final int NUMBER_OF_ROWS_PER_ID = 10; + private static final int START_PARTITION = 1; + private static final int NUMBER_OF_PARTITIONS = 9; + private static final int NUMBER_OF_ROWS_PER_PARTITION = 10; @Test public void testInputFormat() throws Exception { @@ -64,7 +64,7 @@ public class TestMapReduce extends TestCase { List<InputSplit> splits = inputformat.getSplits(job); assertEquals(9, splits.size()); - for (int id = START_ID; id <= NUMBER_OF_IDS; id++) { + for (int id = START_PARTITION; id <= NUMBER_OF_PARTITIONS; id++) { SqoopSplit split = (SqoopSplit)splits.get(id-1); DummyPartition partition = (DummyPartition)split.getPartition(); assertEquals(id, partition.getId()); @@ -118,7 +118,7 @@ public class TestMapReduce extends TestCase { @Override public List<Partition> run(Context context) { List<Partition> partitions = new LinkedList<Partition>(); - for (int id = START_ID; id <= NUMBER_OF_IDS; id++) { + for (int id = START_PARTITION; id <= NUMBER_OF_PARTITIONS; id++) { DummyPartition partition = new DummyPartition(); partition.setId(id); partitions.add(partition); @@ -131,11 +131,11 @@ public class TestMapReduce extends TestCase { @Override public void run(Context context, Partition partition, DataWriter writer) { int id = ((DummyPartition)partition).getId(); - for (int row = 0; row < NUMBER_OF_ROWS_PER_ID; row++) { + for (int row = 0; row < NUMBER_OF_ROWS_PER_PARTITION; row++) { writer.writeArrayRecord(new Object[] { - String.valueOf(id*NUMBER_OF_ROWS_PER_ID+row), - new Integer(id*NUMBER_OF_ROWS_PER_ID+row), - new Double(id*NUMBER_OF_ROWS_PER_ID+row)}); + new Integer(id*NUMBER_OF_ROWS_PER_PARTITION+row), + new Double(id*NUMBER_OF_ROWS_PER_PARTITION+row), + String.valueOf(id*NUMBER_OF_ROWS_PER_PARTITION+row)}); } } } @@ -160,15 +160,16 @@ public class TestMapReduce extends TestCase { public static class DummyRecordWriter extends RecordWriter<Data, NullWritable> { - private int index = START_ID*NUMBER_OF_ROWS_PER_ID; + private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION; private Data data = new Data(); @Override public void write(Data key, NullWritable value) { data.setContent(new Object[] { - String.valueOf(index), new Integer(index), - new Double(index)}); + new Double(index), + String.valueOf(index)}, + Data.ARRAY_RECORD); index++; assertEquals(data.toString(), key.toString()); @@ -201,7 +202,7 @@ public class TestMapReduce extends TestCase { } public static class DummyLoader extends Loader { - private int index = START_ID*NUMBER_OF_ROWS_PER_ID; + private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION; private Data expected = new Data(); private Data actual = new Data(); @@ -209,12 +210,13 @@ public class TestMapReduce extends TestCase { public void run(Context context, DataReader reader) { Object[] array; while ((array = reader.readArrayRecord()) != null) { - actual.setContent(array); + actual.setContent(array, Data.ARRAY_RECORD); expected.setContent(new Object[] { - String.valueOf(index), new Integer(index), - new Double(index)}); + new Double(index), + String.valueOf(index)}, + Data.ARRAY_RECORD); index++; assertEquals(expected.toString(), actual.toString()); http://git-wip-us.apache.org/repos/asf/sqoop/blob/27aa7867/spi/src/main/java/org/apache/sqoop/job/io/DataReader.java ---------------------------------------------------------------------- diff --git a/spi/src/main/java/org/apache/sqoop/job/io/DataReader.java b/spi/src/main/java/org/apache/sqoop/job/io/DataReader.java index b9b2f49..18e2fb7 100644 --- a/spi/src/main/java/org/apache/sqoop/job/io/DataReader.java +++ b/spi/src/main/java/org/apache/sqoop/job/io/DataReader.java @@ -27,6 +27,8 @@ public abstract class DataReader { public abstract String readCsvRecord(); - public abstract Object readRecord(); + public abstract Object readContent(int type); + + public abstract void setFieldDelimiter(char fieldDelimiter); } http://git-wip-us.apache.org/repos/asf/sqoop/blob/27aa7867/spi/src/main/java/org/apache/sqoop/job/io/DataWriter.java ---------------------------------------------------------------------- diff --git a/spi/src/main/java/org/apache/sqoop/job/io/DataWriter.java b/spi/src/main/java/org/apache/sqoop/job/io/DataWriter.java index 29c4283..30a0c7c 100644 --- a/spi/src/main/java/org/apache/sqoop/job/io/DataWriter.java +++ b/spi/src/main/java/org/apache/sqoop/job/io/DataWriter.java @@ -27,6 +27,8 @@ public abstract class DataWriter { public abstract void writeCsvRecord(String csv); - public abstract void writeRecord(Object record); + public abstract void writeContent(Object content, int type); + + public abstract void setFieldDelimiter(char fieldDelimiter); }
