SQOOP-777: Sqoop2: Implement intermediate data format representation policy
(Abraham Elmahrek via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/3c93930b Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/3c93930b Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/3c93930b Branch: refs/heads/SQOOP-1367 Commit: 3c93930bf3d35a3910541ec3099b44c32bf7adf7 Parents: 17c7219 Author: Jarek Jarcec Cecho <[email protected]> Authored: Sat Jul 26 11:37:50 2014 -0700 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Sat Jul 26 11:37:50 2014 -0700 ---------------------------------------------------------------------- common/pom.xml | 5 + .../org/apache/sqoop/etl/io/DataReader.java | 25 +- .../org/apache/sqoop/etl/io/DataWriter.java | 20 +- .../org/apache/sqoop/schema/type/Column.java | 5 + .../connector/jdbc/GenericJdbcConnector.java | 4 +- .../jdbc/GenericJdbcConnectorError.java | 2 + .../jdbc/GenericJdbcExportInitializer.java | 53 ++- .../jdbc/GenericJdbcImportInitializer.java | 7 +- .../sqoop/connector/jdbc/TestExportLoader.java | 10 +- .../connector/jdbc/TestImportExtractor.java | 9 +- .../connector/jdbc/TestImportInitializer.java | 2 +- connector/connector-sdk/pom.xml | 6 + .../idf/CSVIntermediateDataFormat.java | 355 +++++++++++++++++++ .../connector/idf/IntermediateDataFormat.java | 143 ++++++++ .../idf/IntermediateDataFormatError.java | 57 +++ .../idf/CSVIntermediateDataFormatTest.java | 222 ++++++++++++ .../org/apache/sqoop/framework/JobManager.java | 8 + .../sqoop/framework/SubmissionRequest.java | 15 + execution/mapreduce/pom.xml | 4 + .../mapreduce/MapreduceExecutionEngine.java | 61 ++-- .../java/org/apache/sqoop/job/JobConstants.java | 4 + .../sqoop/job/etl/HdfsExportExtractor.java | 12 +- .../sqoop/job/etl/HdfsSequenceImportLoader.java | 10 +- .../sqoop/job/etl/HdfsTextImportLoader.java | 12 +- .../org/apache/sqoop/job/io/SqoopWritable.java | 59 +++ .../sqoop/job/mr/SqoopFileOutputFormat.java | 7 +- .../org/apache/sqoop/job/mr/SqoopMapper.java | 47 +-- .../sqoop/job/mr/SqoopNullOutputFormat.java | 6 +- .../job/mr/SqoopOutputFormatLoadExecutor.java | 85 +++-- .../org/apache/sqoop/job/mr/SqoopReducer.java | 4 +- .../mapreduce/MapreduceExecutionEngineTest.java | 3 + .../java/org/apache/sqoop/job/JobUtils.java | 14 +- .../org/apache/sqoop/job/TestHdfsExtract.java | 121 +++---- .../java/org/apache/sqoop/job/TestHdfsLoad.java | 58 ++- .../org/apache/sqoop/job/TestMapReduce.java | 47 ++- .../apache/sqoop/job/io/SqoopWritableTest.java | 91 +++++ .../mr/TestSqoopOutputFormatLoadExecutor.java | 54 +-- pom.xml | 11 +- spi/pom.xml | 5 + .../sqoop/connector/spi/SqoopConnector.java | 12 + .../mapreduce/MapreduceSubmissionEngine.java | 1 + 41 files changed, 1408 insertions(+), 268 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/common/pom.xml ---------------------------------------------------------------------- diff --git a/common/pom.xml b/common/pom.xml index db11b5b..9bfa07d 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -45,6 +45,11 @@ limitations under the License. <groupId>commons-lang</groupId> <artifactId>commons-lang</artifactId> </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/common/src/main/java/org/apache/sqoop/etl/io/DataReader.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/sqoop/etl/io/DataReader.java b/common/src/main/java/org/apache/sqoop/etl/io/DataReader.java index 3e1adc7..a34dfb4 100644 --- a/common/src/main/java/org/apache/sqoop/etl/io/DataReader.java +++ b/common/src/main/java/org/apache/sqoop/etl/io/DataReader.java @@ -18,17 +18,32 @@ package org.apache.sqoop.etl.io; /** - * An intermediate layer for passing data from the MR framework + * An intermediate layer for passing data from the execution framework * to the ETL framework. */ public abstract class DataReader { + /** + * Read data from the execution framework as an object array. + * @return - array of objects with each column represented as an object + * @throws Exception + */ public abstract Object[] readArrayRecord() throws Exception; - public abstract String readCsvRecord() throws Exception; + /** + * Read data from execution framework as text - as a CSV record. + * public abstract Object readContent(int type) throws Exception; + * @return - CSV formatted data. + * @throws Exception + */ + public abstract String readTextRecord() throws Exception; - public abstract Object readContent(int type) throws Exception; - - public abstract void setFieldDelimiter(char fieldDelimiter); + /** + * Read data from execution framework as a native format. + * @return - the content in the native format of the intermediate data + * format being used. + * @throws Exception + */ + public abstract Object readContent() throws Exception; } http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/common/src/main/java/org/apache/sqoop/etl/io/DataWriter.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/sqoop/etl/io/DataWriter.java b/common/src/main/java/org/apache/sqoop/etl/io/DataWriter.java index d81364e..2166b09 100644 --- a/common/src/main/java/org/apache/sqoop/etl/io/DataWriter.java +++ b/common/src/main/java/org/apache/sqoop/etl/io/DataWriter.java @@ -23,12 +23,24 @@ package org.apache.sqoop.etl.io; */ public abstract class DataWriter { + /** + * Write an array of objects into the execution framework + * @param array - data to be written + */ public abstract void writeArrayRecord(Object[] array); - public abstract void writeCsvRecord(String csv); + /** + * Write data into execution framework as text. The Intermediate Data Format + * may choose to convert the data to another format based on how the data + * format is implemented + * @param text - data represented as CSV text. + */ + public abstract void writeStringRecord(String text); - public abstract void writeContent(Object content, int type); - - public abstract void setFieldDelimiter(char fieldDelimiter); + /** + * Write data in the intermediate data format's native format. + * @param obj - data to be written + */ + public abstract void writeRecord(Object obj); } http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/common/src/main/java/org/apache/sqoop/schema/type/Column.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/sqoop/schema/type/Column.java b/common/src/main/java/org/apache/sqoop/schema/type/Column.java index 8b630b2..30c26a3 100644 --- a/common/src/main/java/org/apache/sqoop/schema/type/Column.java +++ b/common/src/main/java/org/apache/sqoop/schema/type/Column.java @@ -98,4 +98,9 @@ public abstract class Column { result = 31 * result + (nullable != null ? nullable.hashCode() : 0); return result; } + + public boolean validate(Object o) { + // TODO: Implement this in all subclasses! + return true; + } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java index e0da80f..298288e 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java @@ -21,6 +21,8 @@ import java.util.Locale; import java.util.ResourceBundle; import org.apache.sqoop.common.VersionInfo; +import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; +import org.apache.sqoop.connector.idf.IntermediateDataFormat; import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration; import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration; import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration; @@ -61,7 +63,7 @@ public class GenericJdbcConnector extends SqoopConnector { @Override public ResourceBundle getBundle(Locale locale) { return ResourceBundle.getBundle( - GenericJdbcConnectorConstants.RESOURCE_BUNDLE_NAME, locale); + GenericJdbcConnectorConstants.RESOURCE_BUNDLE_NAME, locale); } @Override http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java index 2b1a0ad..c374750 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java @@ -78,6 +78,8 @@ public enum GenericJdbcConnectorError implements ErrorCode { GENERIC_JDBC_CONNECTOR_0018("Error occurred while transferring data from " + "stage table to destination table."), + + GENERIC_JDBC_CONNECTOR_0019("Table name extraction not supported.") ; private final String message; http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java index ef39cdc..80253be 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java @@ -17,6 +17,9 @@ */ package org.apache.sqoop.connector.jdbc; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; import java.util.LinkedList; import java.util.List; @@ -26,9 +29,11 @@ import org.apache.sqoop.common.MutableContext; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration; import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration; +import org.apache.sqoop.connector.jdbc.util.SqlTypesUtils; import org.apache.sqoop.job.etl.Initializer; import org.apache.sqoop.job.etl.InitializerContext; import org.apache.sqoop.schema.Schema; +import org.apache.sqoop.schema.type.Column; import org.apache.sqoop.utils.ClassUtils; public class GenericJdbcExportInitializer extends Initializer<ConnectionConfiguration, ExportJobConfiguration> { @@ -58,7 +63,53 @@ public class GenericJdbcExportInitializer extends Initializer<ConnectionConfigur @Override public Schema getSchema(InitializerContext context, ConnectionConfiguration connectionConfiguration, ExportJobConfiguration exportJobConfiguration) { - return null; + configureJdbcProperties(context.getContext(), connectionConfiguration, exportJobConfiguration); + + String schemaName = exportJobConfiguration.table.tableName; + + if (schemaName == null) { + throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0019, + "Table name extraction not supported yet."); + } + + if(exportJobConfiguration.table.schemaName != null) { + schemaName = exportJobConfiguration.table.schemaName + "." + schemaName; + } + + Schema schema = new Schema(schemaName); + ResultSet rs = null; + ResultSetMetaData rsmt = null; + try { + rs = executor.executeQuery("SELECT * FROM " + schemaName + " WHERE 1 = 0"); + + rsmt = rs.getMetaData(); + for (int i = 1 ; i <= rsmt.getColumnCount(); i++) { + Column column = SqlTypesUtils.sqlTypeToAbstractType(rsmt.getColumnType(i)); + + String columnName = rsmt.getColumnName(i); + if (columnName == null || columnName.equals("")) { + columnName = rsmt.getColumnLabel(i); + if (null == columnName) { + columnName = "Column " + i; + } + } + + column.setName(columnName); + schema.addColumn(column); + } + + return schema; + } catch (SQLException e) { + throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0016, e); + } finally { + if(rs != null) { + try { + rs.close(); + } catch (SQLException e) { + LOG.info("Ignoring exception while closing ResultSet", e); + } + } + } } private void configureJdbcProperties(MutableContext context, ConnectionConfiguration connectionConfig, ExportJobConfiguration jobConfig) { http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java index 96818ba..2ad3cb2 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java @@ -71,16 +71,17 @@ public class GenericJdbcImportInitializer extends Initializer<ConnectionConfigur String schemaName = importJobConfiguration.table.tableName; if(schemaName == null) { schemaName = "Query"; + } else if(importJobConfiguration.table.schemaName != null) { + schemaName = importJobConfiguration.table.schemaName + "." + schemaName; } Schema schema = new Schema(schemaName); - ResultSet rs = null; ResultSetMetaData rsmt = null; try { rs = executor.executeQuery( - context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL) - .replace(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 0") + context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL) + .replace(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 0") ); rsmt = rs.getMetaData(); http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java index d4c4565..fc3ddd0 100644 --- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java +++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java @@ -113,11 +113,6 @@ public class TestExportLoader { int index = 0; @Override - public void setFieldDelimiter(char fieldDelimiter) { - // do nothing and use default delimiter - } - - @Override public Object[] readArrayRecord() { if (index < numberOfRows) { Object[] array = new Object[] { @@ -132,16 +127,17 @@ public class TestExportLoader { } @Override - public String readCsvRecord() { + public String readTextRecord() { fail("This method should not be invoked."); return null; } @Override - public Object readContent(int type) { + public Object readContent() throws Exception { fail("This method should not be invoked."); return null; } + } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java index a7ed6ba..30d0b9a 100644 --- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java +++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java @@ -134,11 +134,6 @@ public class TestImportExtractor extends TestCase { int indx = START; @Override - public void setFieldDelimiter(char fieldDelimiter) { - // do nothing and use default delimiter - } - - @Override public void writeArrayRecord(Object[] array) { for (int i = 0; i < array.length; i++) { if (array[i] instanceof Integer) { @@ -153,12 +148,12 @@ public class TestImportExtractor extends TestCase { } @Override - public void writeCsvRecord(String csv) { + public void writeStringRecord(String text) { fail("This method should not be invoked."); } @Override - public void writeContent(Object content, int type) { + public void writeRecord(Object content) { fail("This method should not be invoked."); } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java index a33fa36..cd05e30 100644 --- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java +++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java @@ -327,7 +327,7 @@ public class TestImportInitializer extends TestCase { Initializer initializer = new GenericJdbcImportInitializer(); initializer.initialize(initializerContext, connConf, jobConf); Schema schema = initializer.getSchema(initializerContext, connConf, jobConf); - assertEquals(getSchema(tableName), schema); + assertEquals(getSchema(jobConf.table.schemaName + "." + tableName), schema); } @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/connector/connector-sdk/pom.xml ---------------------------------------------------------------------- diff --git a/connector/connector-sdk/pom.xml b/connector/connector-sdk/pom.xml index 4056e14..f54837d 100644 --- a/connector/connector-sdk/pom.xml +++ b/connector/connector-sdk/pom.xml @@ -38,6 +38,12 @@ limitations under the License. <artifactId>junit</artifactId> <scope>test</scope> </dependency> + + <dependency> + <groupId>org.apache.sqoop</groupId> + <artifactId>sqoop-common</artifactId> + </dependency> + </dependencies> </project> http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java ---------------------------------------------------------------------- diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java new file mode 100644 index 0000000..39d48c7 --- /dev/null +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java @@ -0,0 +1,355 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sqoop.connector.idf; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.commons.lang.StringUtils; +import org.apache.log4j.Logger; +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.schema.Schema; +import org.apache.sqoop.schema.type.Column; +import org.apache.sqoop.schema.type.FixedPoint; +import org.apache.sqoop.schema.type.FloatingPoint; +import org.apache.sqoop.schema.type.Type; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> { + + public static final char SEPARATOR_CHARACTER = ','; + public static final char ESCAPE_CHARACTER = '\\'; + public static final char QUOTE_CHARACTER = '\''; + + private static final Logger LOG = Logger.getLogger + (CSVIntermediateDataFormat.class); + + private static final char[] originals = { + 0x5C,0x00,0x0A,0x0D,0x1A,0x22,0x27 + }; + + private static final String[] replacements = { + new String(new char[] { ESCAPE_CHARACTER, '\\'}), + new String(new char[] { ESCAPE_CHARACTER, '0'}), + new String(new char[] { ESCAPE_CHARACTER, 'n'}), + new String(new char[] { ESCAPE_CHARACTER, 'r'}), + new String(new char[] { ESCAPE_CHARACTER, 'Z'}), + new String(new char[] { ESCAPE_CHARACTER, '\"'}), + new String(new char[] { ESCAPE_CHARACTER, '\''}) + }; + + // ISO-8859-1 is an 8-bit codec that is supported in every java implementation. + public static final String BYTE_FIELD_CHARSET = "ISO-8859-1"; + + private final List<Integer> stringFieldIndices = new ArrayList<Integer>(); + private final List<Integer> byteFieldIndices = new ArrayList<Integer>(); + + private Schema schema; + + /** + * {@inheritDoc} + */ + @Override + public String getTextData() { + return data; + } + + /** + * {@inheritDoc} + */ + @Override + public void setTextData(String text) { + this.data = text; + } + + /** + * {@inheritDoc} + */ + @Override + public Schema getSchema() { + return schema; + } + + /** + * {@inheritDoc} + */ + @Override + public void setSchema(Schema schema) { + if(schema == null) { + return; + } + this.schema = schema; + List<Column> columns = schema.getColumns(); + int i = 0; + for(Column col : columns) { + if(col.getType() == Type.TEXT) { + stringFieldIndices.add(i); + } else if(col.getType() == Type.BINARY) { + byteFieldIndices.add(i); + } + i++; + } + } + + /** + * Custom CSV parser that honors quoting and escaped quotes. + * All other escaping is handled elsewhere. + * + * @return String[] + */ + private String[] getFields() { + if (data == null) { + return null; + } + + boolean quoted = false; + boolean escaped = false; + List<String> parsedData = new LinkedList<String>(); + StringBuffer buffer = new StringBuffer(); + for (int i = 0; i < data.length(); ++i) { + char c = data.charAt(i); + switch(c) { + case QUOTE_CHARACTER: + buffer.append(c); + if (escaped) { + escaped = false; + } else { + quoted = !quoted; + } + break; + + case ESCAPE_CHARACTER: + buffer.append(ESCAPE_CHARACTER); + escaped = !escaped; + break; + + case SEPARATOR_CHARACTER: + if (quoted) { + buffer.append(c); + } else { + parsedData.add(buffer.toString()); + buffer = new StringBuffer(); + } + break; + + default: + if (escaped) { + escaped = false; + } + buffer.append(c); + break; + } + } + parsedData.add(buffer.toString()); + + return parsedData.toArray(new String[parsedData.size()]); + } + + /** + * {@inheritDoc} + */ + @Override + public Object[] getObjectData() { + String[] fields = getFields(); + if (fields == null) { + return null; + } + + if (fields.length != schema.getColumns().size()) { + throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005, + "The data " + getTextData() + " has the wrong number of fields."); + } + + Object[] out = new Object[fields.length]; + Column[] cols = schema.getColumns().toArray(new Column[fields.length]); + for (int i = 0; i < fields.length; i++) { + Type colType = cols[i].getType(); + if (fields[i].equals("NULL")) { + out[i] = null; + continue; + } + if (colType == Type.TEXT) { + out[i] = unescapeStrings(fields[i]); + } else if (colType == Type.BINARY) { + out[i] = unescapeByteArray(fields[i]); + } else if (colType == Type.FIXED_POINT) { + Long byteSize = ((FixedPoint) cols[i]).getByteSize(); + if (byteSize != null && byteSize <= Integer.SIZE) { + out[i] = Integer.valueOf(fields[i]); + } else { + out[i] = Long.valueOf(fields[i]); + } + } else if (colType == Type.FLOATING_POINT) { + Long byteSize = ((FloatingPoint) cols[i]).getByteSize(); + if (byteSize != null && byteSize <= Float.SIZE) { + out[i] = Float.valueOf(fields[i]); + } else { + out[i] = Double.valueOf(fields[i]); + } + } else if (colType == Type.DECIMAL) { + out[i] = new BigDecimal(fields[i]); + } else { + throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0004, "Column type from schema was not recognized for " + colType); + } + } + return out; + } + + /** + * {@inheritDoc} + */ + @VisibleForTesting + @Override + public void setObjectData(Object[] data) { + escapeArray(data); + this.data = StringUtils.join(data, SEPARATOR_CHARACTER); + } + + /** + * {@inheritDoc} + */ + @Override + public void write(DataOutput out) throws IOException { + out.writeUTF(this.data); + } + + /** + * {@inheritDoc} + */ + @Override + public void read(DataInput in) throws IOException { + data = in.readUTF(); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean equals(Object other) { + if(this == other) { + return true; + } + if(other == null || !(other instanceof CSVIntermediateDataFormat)) { + return false; + } + return data.equals(((CSVIntermediateDataFormat)other).data); + } + + public int compareTo(IntermediateDataFormat<?> o) { + if(this == o) { + return 0; + } + if(this.equals(o)) { + return 0; + } + if(!(o instanceof CSVIntermediateDataFormat)) { + throw new IllegalStateException("Expected Data to be instance of " + + "CSVIntermediateFormat, but was an instance of " + o.getClass() + .getName()); + } + return data.compareTo(o.getTextData()); + } + + /** + * If the incoming data is an array, parse it and return the CSV-ised version + * + * @param array + */ + private void escapeArray(Object[] array) { + for (int i : stringFieldIndices) { + array[i] = escapeStrings((String) array[i]); + } + for (int i : byteFieldIndices) { + array[i] = escapeByteArrays((byte[]) array[i]); + } + } + + private String escapeByteArrays(byte[] bytes) { + try { + return escapeStrings(new String(bytes, BYTE_FIELD_CHARSET)); + } catch (UnsupportedEncodingException e) { + // We should never hit this case. + // This character set should be distributed with Java. + throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001, "The character set " + BYTE_FIELD_CHARSET + " is not available."); + } + } + + private String getRegExp(char orig) { + return getRegExp(String.valueOf(orig)); + } + + private String getRegExp(String orig) { + return orig.replaceAll("\\\\", Matcher.quoteReplacement("\\\\")); + } + + private String escapeStrings(String orig) { + int j = 0; + String replacement = orig; + try { + for (j = 0; j < replacements.length; j++) { + replacement = replacement.replaceAll(getRegExp(originals[j]), Matcher.quoteReplacement(replacements[j])); + } + } catch (Exception e) { + throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0002, orig + " " + replacement + " " + String.valueOf(j) + " " + e.getMessage()); + } + StringBuilder builder = new StringBuilder(); + builder.append(QUOTE_CHARACTER).append(replacement).append(QUOTE_CHARACTER); + return builder.toString(); + } + + private String unescapeStrings(String orig) { + //Remove the trailing and starting quotes. + orig = orig.substring(1, orig.length() - 1); + int j = 0; + try { + for (j = 0; j < replacements.length; j++) { + orig = orig.replaceAll(getRegExp(replacements[j]), + Matcher.quoteReplacement(String.valueOf(originals[j]))); + } + } catch (Exception e) { + throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0003, orig + " " + String.valueOf(j) + e.getMessage()); + } + + return orig; + } + + private byte[] unescapeByteArray(String orig) { + // Always encoded in BYTE_FIELD_CHARSET. + try { + return unescapeStrings(orig).getBytes(BYTE_FIELD_CHARSET); + } catch (UnsupportedEncodingException e) { + // Should never hit this case. + // This character set should be distributed with Java. + throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001, "The character set " + BYTE_FIELD_CHARSET + " is not available."); + } + } + + public String toString() { + return data; + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java ---------------------------------------------------------------------- diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java new file mode 100644 index 0000000..91b594e --- /dev/null +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sqoop.connector.idf; + +import org.apache.sqoop.schema.Schema; +import org.apache.sqoop.schema.type.Column; +import org.apache.sqoop.schema.type.Type; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Abstract class representing a pluggable intermediate data format the Sqoop + * framework will use to move data to/from the connector. All intermediate + * data formats are expected to have an internal/native implementation, + * but also should minimally be able to return a text (CSV) version of the + * data. The data format should also be able to return the data as an object + * array - each array representing one row. + * <p/> + * Why a "native" internal format and then return text too? + * Imagine a connector that moves data from a system that stores data as a + * serialization format called FooFormat. If I also need the data to be + * written into HDFS as FooFormat, the additional cycles burnt in converting + * the FooFormat to text and back is useless - so plugging in an intermediate + * format that can store the data as FooFormat saves those cycles! + * <p/> + * Most fast access mechanisms, like mysqldump or pgsqldump write the data + * out as CSV, and most often the destination data is also represented as CSV + * - so having a minimal CSV support is important, so we can easily pull the + * data out as text. + * <p/> + * Any conversion to the final format from the native or text format is to be + * done by the connector or OutputFormat classes. + * + * @param <T> - Each data format may have a native representation of the + * data, represented by the parameter. + */ +public abstract class IntermediateDataFormat<T> { + + protected volatile T data; + + public int hashCode() { + return data.hashCode(); + } + + /** + * Set one row of data. If validate is set to true, the data is validated + * against the schema. + * + * @param data - A single row of data to be moved. + */ + public void setData(T data) { + this.data = data; + } + + /** + * Get one row of data. + * + * @return - One row of data, represented in the internal/native format of + * the intermediate data format implementation. + */ + public T getData() { + return data; + } + + /** + * Get one row of data as CSV. + * + * @return - String representing the data in CSV + */ + public abstract String getTextData(); + + /** + * Set one row of data as CSV. + * + */ + public abstract void setTextData(String text); + + /** + * Get one row of data as an Object array. + * + * @return - String representing the data as an Object array + */ + public abstract Object[] getObjectData(); + + /** + * Set one row of data as an Object array. + * + */ + public abstract void setObjectData(Object[] data); + + /** + * Set the schema to be used. + * + * @param schema - the schema to be used + */ + public abstract void setSchema(Schema schema); + + /** + * Get the schema of the data. + * + * @return - The schema of the data. + */ + public abstract Schema getSchema(); + + /** + * Serialize the fields of this object to <code>out</code>. + * + * @param out <code>DataOuput</code> to serialize this object into. + * @throws IOException + */ + public abstract void write(DataOutput out) throws IOException; + + /** + * Deserialize the fields of this object from <code>in</code>. + * + * <p>For efficiency, implementations should attempt to re-use storage in the + * existing object where possible.</p> + * + * @param in <code>DataInput</code> to deseriablize this object from. + * @throws IOException + */ + public abstract void read(DataInput in) throws IOException; +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java ---------------------------------------------------------------------- diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java new file mode 100644 index 0000000..9219074 --- /dev/null +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sqoop.connector.idf; + +import org.apache.sqoop.common.ErrorCode; + +public enum IntermediateDataFormatError implements ErrorCode { + /** An unknown error has occurred. */ + INTERMEDIATE_DATA_FORMAT_0000("An unknown error has occurred."), + + /** An encoding is missing in the Java native libraries. */ + INTERMEDIATE_DATA_FORMAT_0001("Native character set error."), + + /** Error while escaping a row. */ + INTERMEDIATE_DATA_FORMAT_0002("An error has occurred while escaping a row."), + + /** Error while escaping a row. */ + INTERMEDIATE_DATA_FORMAT_0003("An error has occurred while unescaping a row."), + + /** Column type isn't known by Intermediate Data Format. */ + INTERMEDIATE_DATA_FORMAT_0004("Unknown column type."), + + /** Number of fields. */ + INTERMEDIATE_DATA_FORMAT_0005("Wrong number of fields.") + + ; + + private final String message; + + private IntermediateDataFormatError(String message) { + this.message = message; + } + + public String getCode() { + return name(); + } + + public String getMessage() { + return message; + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormatTest.java ---------------------------------------------------------------------- diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormatTest.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormatTest.java new file mode 100644 index 0000000..df6d30f --- /dev/null +++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormatTest.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sqoop.connector.idf; + +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.schema.Schema; +import org.apache.sqoop.schema.type.Binary; +import org.apache.sqoop.schema.type.FixedPoint; +import org.apache.sqoop.schema.type.Text; +import org.junit.Before; +import org.junit.Test; + +import java.io.UnsupportedEncodingException; +import java.util.Arrays; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class CSVIntermediateDataFormatTest { + + private final String BYTE_FIELD_ENCODING = "ISO-8859-1"; + + private IntermediateDataFormat<?> data; + + @Before + public void setUp() { + data = new CSVIntermediateDataFormat(); + } + + private String getByteFieldString(byte[] byteFieldData) { + try { + return new StringBuilder("'").append(new String(byteFieldData, BYTE_FIELD_ENCODING)).append("'").toString(); + } catch(UnsupportedEncodingException e) { + // Should never get to this point because ISO-8859-1 is a standard codec. + return null; + } + } + + @Test + public void testStringInStringOut() { + String testData = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54}) + + ",'" + String.valueOf(0x0A) + "'"; + data.setTextData(testData); + assertEquals(testData, data.getTextData()); + } + + @Test + public void testNullStringInObjectOut() { + Schema schema = new Schema("test"); + schema.addColumn(new FixedPoint("1")) + .addColumn(new FixedPoint("2")) + .addColumn(new Text("3")) + .addColumn(new Text("4")) + .addColumn(new Binary("5")) + .addColumn(new Text("6")); + data.setSchema(schema); + data.setTextData(null); + + Object[] out = data.getObjectData(); + + assertNull(out); + } + + @Test(expected=SqoopException.class) + public void testEmptyStringInObjectOut() { + Schema schema = new Schema("test"); + schema.addColumn(new FixedPoint("1")) + .addColumn(new FixedPoint("2")) + .addColumn(new Text("3")) + .addColumn(new Text("4")) + .addColumn(new Binary("5")) + .addColumn(new Text("6")); + data.setSchema(schema); + data.setTextData(""); + + data.getObjectData(); + } + + @Test + public void testStringInObjectOut() { + + //byte[0] = -112, byte[1] = 54 - 2's complements + String testData = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54}) + + ",'\\n'"; + Schema schema = new Schema("test"); + schema.addColumn(new FixedPoint("1")) + .addColumn(new FixedPoint("2")) + .addColumn(new Text("3")) + .addColumn(new Text("4")) + .addColumn(new Binary("5")) + .addColumn(new Text("6")); + data.setSchema(schema); + data.setTextData(testData); + + Object[] out = data.getObjectData(); + + assertEquals(new Long(10),out[0]); + assertEquals(new Long(34),out[1]); + assertEquals("54",out[2]); + assertEquals("random data",out[3]); + assertEquals(-112, ((byte[])out[4])[0]); + assertEquals(54, ((byte[])out[4])[1]); + assertEquals("\n", out[5].toString()); + } + + @Test + public void testObjectInStringOut() { + Schema schema = new Schema("test"); + schema.addColumn(new FixedPoint("1")) + .addColumn(new FixedPoint("2")) + .addColumn(new Text("3")) + .addColumn(new Text("4")) + .addColumn(new Binary("5")) + .addColumn(new Text("6")); + data.setSchema(schema); + + byte[] byteFieldData = new byte[] { (byte) 0x0D, (byte) -112, (byte) 54}; + Object[] in = new Object[6]; + in[0] = new Long(10); + in[1] = new Long(34); + in[2] = "54"; + in[3] = "random data"; + in[4] = byteFieldData; + in[5] = new String(new char[] { 0x0A }); + + data.setObjectData(in); + + //byte[0] = \r byte[1] = -112, byte[1] = 54 - 2's complements + String testData = "10,34,'54','random data'," + + getByteFieldString(byteFieldData).replaceAll("\r", "\\\\r") + ",'\\n'"; + assertEquals(testData, data.getTextData()); + } + + @Test + public void testObjectInObjectOut() { + //Test escapable sequences too. + //byte[0] = -112, byte[1] = 54 - 2's complements + Schema schema = new Schema("test"); + schema.addColumn(new FixedPoint("1")) + .addColumn(new FixedPoint("2")) + .addColumn(new Text("3")) + .addColumn(new Text("4")) + .addColumn(new Binary("5")) + .addColumn(new Text("6")); + data.setSchema(schema); + + Object[] in = new Object[6]; + in[0] = new Long(10); + in[1] = new Long(34); + in[2] = "54"; + in[3] = "random data"; + in[4] = new byte[] { (byte) -112, (byte) 54}; + in[5] = new String(new char[] { 0x0A }); + Object[] inCopy = new Object[6]; + System.arraycopy(in,0,inCopy,0,in.length); + + // Modifies the input array, so we use the copy to confirm + data.setObjectData(in); + + assertTrue(Arrays.deepEquals(inCopy, data.getObjectData())); + } + + @Test + public void testStringFullRangeOfCharacters() { + Schema schema = new Schema("test"); + schema.addColumn(new Text("1")); + data.setSchema(schema); + + char[] allCharArr = new char[256]; + for(int i = 0; i < allCharArr.length; ++i) { + allCharArr[i] = (char)i; + } + String strData = new String(allCharArr); + + Object[] in = {strData}; + Object[] inCopy = new Object[1]; + System.arraycopy(in,0,inCopy,0,in.length); + + // Modifies the input array, so we use the copy to confirm + data.setObjectData(in); + + assertEquals(strData, data.getObjectData()[0]); + assertTrue(Arrays.deepEquals(inCopy, data.getObjectData())); + } + + @Test + public void testByteArrayFullRangeOfCharacters() { + Schema schema = new Schema("test"); + schema.addColumn(new Binary("1")); + data.setSchema(schema); + + byte[] allCharByteArr = new byte[256]; + for(int i = 0; i < allCharByteArr.length; ++i) { + allCharByteArr[i] = (byte)i; + } + + Object[] in = {allCharByteArr}; + Object[] inCopy = new Object[1]; + System.arraycopy(in,0,inCopy,0,in.length); + + // Modifies the input array, so we use the copy to confirm + data.setObjectData(in); + assertTrue(Arrays.deepEquals(inCopy, data.getObjectData())); + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/core/src/main/java/org/apache/sqoop/framework/JobManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/framework/JobManager.java b/core/src/main/java/org/apache/sqoop/framework/JobManager.java index e052584..1700432 100644 --- a/core/src/main/java/org/apache/sqoop/framework/JobManager.java +++ b/core/src/main/java/org/apache/sqoop/framework/JobManager.java @@ -22,6 +22,7 @@ import org.apache.sqoop.common.MapContext; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.ConnectorManager; import org.apache.sqoop.request.HttpEventContext; +import org.apache.sqoop.connector.idf.IntermediateDataFormat; import org.apache.sqoop.connector.spi.SqoopConnector; import org.apache.sqoop.core.Reconfigurable; import org.apache.sqoop.core.SqoopConfiguration; @@ -327,6 +328,10 @@ public class JobManager implements Reconfigurable { request.setJobName(job.getName()); request.setJobId(job.getPersistenceId()); request.setNotificationUrl(notificationBaseUrl + jobId); + Class<? extends IntermediateDataFormat<?>> dataFormatClass = + connector.getIntermediateDataFormat(); + request.setIntermediateDataFormat(connector.getIntermediateDataFormat()); + // Create request object // Let's register all important jars // sqoop-common @@ -343,6 +348,9 @@ public class JobManager implements Reconfigurable { // Extra libraries that Sqoop code requires request.addJarForClass(JSONValue.class); + // The IDF is used in the ETL process. + request.addJarForClass(dataFormatClass); + // Get connector callbacks switch (job.getType()) { case IMPORT: http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java b/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java index a138db5..7900eee 100644 --- a/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java +++ b/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java @@ -18,6 +18,7 @@ package org.apache.sqoop.framework; import org.apache.sqoop.common.MutableMapContext; +import org.apache.sqoop.connector.idf.IntermediateDataFormat; import org.apache.sqoop.connector.spi.SqoopConnector; import org.apache.sqoop.job.etl.CallbackBase; import org.apache.sqoop.model.MJob; @@ -107,6 +108,11 @@ public class SubmissionRequest { */ Integer loaders; + /** + * The intermediate data format this submission should use. + */ + Class<? extends IntermediateDataFormat> intermediateDataFormat; + public SubmissionRequest() { this.jars = new LinkedList<String>(); this.connectorContext = new MutableMapContext(); @@ -252,4 +258,13 @@ public class SubmissionRequest { public void setLoaders(Integer loaders) { this.loaders = loaders; } + + public Class<? extends IntermediateDataFormat> getIntermediateDataFormat() { + return intermediateDataFormat; + } + + public void setIntermediateDataFormat(Class<? extends IntermediateDataFormat> intermediateDataFormat) { + this.intermediateDataFormat = intermediateDataFormat; + } + } http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/execution/mapreduce/pom.xml ---------------------------------------------------------------------- diff --git a/execution/mapreduce/pom.xml b/execution/mapreduce/pom.xml index f9a2a0e..b23b905 100644 --- a/execution/mapreduce/pom.xml +++ b/execution/mapreduce/pom.xml @@ -52,6 +52,10 @@ limitations under the License. <artifactId>mockito-all</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> <!-- See profiles for Hadoop specific dependencies --> http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java index 5c0a027..84f6213 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java @@ -34,7 +34,7 @@ import org.apache.sqoop.job.etl.HdfsExportPartitioner; import org.apache.sqoop.job.etl.HdfsSequenceImportLoader; import org.apache.sqoop.job.etl.HdfsTextImportLoader; import org.apache.sqoop.job.etl.Importer; -import org.apache.sqoop.job.io.Data; +import org.apache.sqoop.job.io.SqoopWritable; import org.apache.sqoop.job.mr.SqoopFileOutputFormat; import org.apache.sqoop.job.mr.SqoopInputFormat; import org.apache.sqoop.job.mr.SqoopMapper; @@ -53,14 +53,7 @@ public class MapreduceExecutionEngine extends ExecutionEngine { return new MRSubmissionRequest(); } - /** - * {@inheritDoc} - */ - @Override - public void prepareImportSubmission(SubmissionRequest gRequest) { - MRSubmissionRequest request = (MRSubmissionRequest) gRequest; - ImportJobConfiguration jobConf = (ImportJobConfiguration) request.getConfigFrameworkJob(); - + public void prepareSubmission(MRSubmissionRequest request) { // Add jar dependencies addDependencies(request); @@ -68,13 +61,35 @@ public class MapreduceExecutionEngine extends ExecutionEngine { request.setInputFormatClass(SqoopInputFormat.class); request.setMapperClass(SqoopMapper.class); - request.setMapOutputKeyClass(Data.class); + request.setMapOutputKeyClass(SqoopWritable.class); request.setMapOutputValueClass(NullWritable.class); - request.setOutputFormatClass(SqoopFileOutputFormat.class); - request.setOutputKeyClass(Data.class); + request.setOutputFormatClass(SqoopNullOutputFormat.class); + request.setOutputKeyClass(SqoopWritable.class); request.setOutputValueClass(NullWritable.class); + // Set up framework context + MutableMapContext context = request.getFrameworkContext(); + context.setString(JobConstants.INTERMEDIATE_DATA_FORMAT, + request.getIntermediateDataFormat().getName()); + + if(request.getExtractors() != null) { + context.setInteger(JobConstants.JOB_ETL_EXTRACTOR_NUM, request.getExtractors()); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void prepareImportSubmission(SubmissionRequest gRequest) { + MRSubmissionRequest request = (MRSubmissionRequest) gRequest; + + prepareSubmission(request); + request.setOutputFormatClass(SqoopFileOutputFormat.class); + + ImportJobConfiguration jobConf = (ImportJobConfiguration) request.getConfigFrameworkJob(); + Importer importer = (Importer)request.getConnectorCallbacks(); // Set up framework context @@ -83,10 +98,6 @@ public class MapreduceExecutionEngine extends ExecutionEngine { context.setString(JobConstants.JOB_ETL_EXTRACTOR, importer.getExtractor().getName()); context.setString(JobConstants.JOB_ETL_DESTROYER, importer.getDestroyer().getName()); - if(request.getExtractors() != null) { - context.setInteger(JobConstants.JOB_ETL_EXTRACTOR_NUM, request.getExtractors()); - } - // TODO: This settings should be abstracted to core module at some point if(jobConf.output.outputFormat == OutputFormat.TEXT_FILE) { context.setString(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName()); @@ -137,19 +148,7 @@ public class MapreduceExecutionEngine extends ExecutionEngine { MRSubmissionRequest request = (MRSubmissionRequest) gRequest; ExportJobConfiguration jobConf = (ExportJobConfiguration) request.getConfigFrameworkJob(); - // Add jar dependencies - addDependencies(request); - - // Configure map-reduce classes for import - request.setInputFormatClass(SqoopInputFormat.class); - - request.setMapperClass(SqoopMapper.class); - request.setMapOutputKeyClass(Data.class); - request.setMapOutputValueClass(NullWritable.class); - - request.setOutputFormatClass(SqoopNullOutputFormat.class); - request.setOutputKeyClass(Data.class); - request.setOutputValueClass(NullWritable.class); + prepareSubmission(request); Exporter exporter = (Exporter)request.getConnectorCallbacks(); @@ -162,10 +161,6 @@ public class MapreduceExecutionEngine extends ExecutionEngine { // Extractor that will be able to read all supported file types context.setString(JobConstants.JOB_ETL_EXTRACTOR, HdfsExportExtractor.class.getName()); context.setString(JobConstants.HADOOP_INPUTDIR, jobConf.input.inputDirectory); - - if(request.getExtractors() != null) { - context.setInteger(JobConstants.JOB_ETL_EXTRACTOR_NUM, request.getExtractors()); - } } /** http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java index 7fd9a01..b2fa15d 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java @@ -18,6 +18,7 @@ package org.apache.sqoop.job; import org.apache.sqoop.core.ConfigurationConstants; +import org.apache.sqoop.framework.FrameworkConstants; public final class JobConstants extends Constants { /** @@ -66,6 +67,9 @@ public final class JobConstants extends Constants { public static final String HADOOP_COMPRESS_CODEC = "mapred.output.compression.codec"; + public static final String INTERMEDIATE_DATA_FORMAT = + FrameworkConstants.PREFIX_EXECUTION_CONFIG + "intermediate.format"; + private JobConstants() { // Disable explicit object creation } http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportExtractor.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportExtractor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportExtractor.java index 1978ec6..43e6463 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportExtractor.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportExtractor.java @@ -36,7 +36,6 @@ import org.apache.sqoop.framework.configuration.ConnectionConfiguration; import org.apache.sqoop.framework.configuration.ExportJobConfiguration; import org.apache.sqoop.job.MapreduceExecutionError; import org.apache.sqoop.job.PrefixContext; -import org.apache.sqoop.job.io.Data; /** * Extract from HDFS. @@ -50,12 +49,6 @@ public class HdfsExportExtractor extends Extractor<ConnectionConfiguration, Expo private DataWriter dataWriter; private long rowRead = 0; - private final char fieldDelimiter; - - public HdfsExportExtractor() { - fieldDelimiter = Data.DEFAULT_FIELD_DELIMITER; - } - @Override public void extract(ExtractorContext context, ConnectionConfiguration connectionConfiguration, @@ -63,7 +56,6 @@ public class HdfsExportExtractor extends Extractor<ConnectionConfiguration, Expo conf = ((PrefixContext) context.getContext()).getConfiguration(); dataWriter = context.getDataWriter(); - dataWriter.setFieldDelimiter(fieldDelimiter); try { HdfsExportPartition p = partition; @@ -113,7 +105,7 @@ public class HdfsExportExtractor extends Extractor<ConnectionConfiguration, Expo boolean hasNext = filereader.next(line); while (hasNext) { rowRead++; - dataWriter.writeCsvRecord(line.toString()); + dataWriter.writeStringRecord(line.toString()); line = new Text(); hasNext = filereader.next(line); if (filereader.getPosition() >= end && filereader.syncSeen()) { @@ -173,7 +165,7 @@ public class HdfsExportExtractor extends Extractor<ConnectionConfiguration, Expo next = fileseeker.getPos(); } rowRead++; - dataWriter.writeCsvRecord(line.toString()); + dataWriter.writeStringRecord(line.toString()); } LOG.info("Extracting ended on position: " + fileseeker.getPos()); filestream.close(); http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java index a07c511..d4ffb13 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java @@ -30,7 +30,6 @@ import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.job.JobConstants; import org.apache.sqoop.job.MapreduceExecutionError; -import org.apache.sqoop.job.io.Data; import org.apache.sqoop.etl.io.DataReader; import org.apache.sqoop.utils.ClassUtils; @@ -38,16 +37,9 @@ public class HdfsSequenceImportLoader extends Loader { public static final String EXTENSION = ".seq"; - private final char fieldDelimiter; - - public HdfsSequenceImportLoader() { - fieldDelimiter = Data.DEFAULT_FIELD_DELIMITER; - } - @Override public void load(LoaderContext context, Object oc, Object oj) throws Exception { DataReader reader = context.getDataReader(); - reader.setFieldDelimiter(fieldDelimiter); Configuration conf = new Configuration(); // Configuration conf = ((EtlContext)context).getConfiguration(); @@ -87,7 +79,7 @@ public class HdfsSequenceImportLoader extends Loader { String csv; Text text = new Text(); - while ((csv = reader.readCsvRecord()) != null) { + while ((csv = reader.readTextRecord()) != null) { text.set(csv); filewriter.append(text, NullWritable.get()); } http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java index 4621942..7b799ca 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java @@ -22,6 +22,7 @@ import java.io.DataOutputStream; import java.io.IOException; import java.io.OutputStreamWriter; +import com.google.common.base.Charsets; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -36,18 +37,15 @@ import org.apache.sqoop.utils.ClassUtils; public class HdfsTextImportLoader extends Loader { - private final char fieldDelimiter; private final char recordDelimiter; public HdfsTextImportLoader() { - fieldDelimiter = Data.DEFAULT_FIELD_DELIMITER; recordDelimiter = Data.DEFAULT_RECORD_DELIMITER; } @Override public void load(LoaderContext context, Object oc, Object oj) throws Exception{ DataReader reader = context.getDataReader(); - reader.setFieldDelimiter(fieldDelimiter); Configuration conf = new Configuration(); // Configuration conf = ((EtlContext)context).getConfiguration(); @@ -81,15 +79,15 @@ public class HdfsTextImportLoader extends Loader { DataOutputStream filestream = fs.create(filepath, false); if (codec != null) { filewriter = new BufferedWriter(new OutputStreamWriter( - codec.createOutputStream(filestream, codec.createCompressor()), - Data.CHARSET_NAME)); + codec.createOutputStream(filestream, codec.createCompressor()), + Charsets.UTF_8)); } else { filewriter = new BufferedWriter(new OutputStreamWriter( - filestream, Data.CHARSET_NAME)); + filestream, Charsets.UTF_8)); } String csv; - while ((csv = reader.readCsvRecord()) != null) { + while ((csv = reader.readTextRecord()) != null) { filewriter.write(csv + recordDelimiter); } filewriter.close(); http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/SqoopWritable.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/SqoopWritable.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/SqoopWritable.java new file mode 100644 index 0000000..ed118d2 --- /dev/null +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/SqoopWritable.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sqoop.job.io; + +import org.apache.hadoop.io.WritableComparable; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +public class SqoopWritable implements WritableComparable<SqoopWritable> { + private String strData; + + public SqoopWritable() {} + + public void setString(String data) { + strData = data; + } + + public String getString() { + return strData; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeUTF(strData); + } + + @Override + public void readFields(DataInput in) throws IOException { + strData = in.readUTF(); + } + + @Override + public int compareTo(SqoopWritable o) { + return strData.compareTo(o.getString()); + } + + @Override + public String toString() { + return getString(); + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java index 356ae8a..bbf7342 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java @@ -34,13 +34,13 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.log4j.Logger; import org.apache.sqoop.job.JobConstants; -import org.apache.sqoop.job.io.Data; +import org.apache.sqoop.job.io.SqoopWritable; /** * An output format for MapReduce job. */ public class SqoopFileOutputFormat - extends FileOutputFormat<Data, NullWritable> { + extends FileOutputFormat<SqoopWritable, NullWritable> { public static final Logger LOG = Logger.getLogger(SqoopFileOutputFormat.class); @@ -49,7 +49,7 @@ public class SqoopFileOutputFormat DefaultCodec.class; @Override - public RecordWriter<Data, NullWritable> getRecordWriter( + public RecordWriter<SqoopWritable, NullWritable> getRecordWriter( TaskAttemptContext context) throws IOException { Configuration conf = context.getConfiguration(); @@ -69,6 +69,7 @@ public class SqoopFileOutputFormat return executor.getRecordWriter(); } + @Override public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException { Path output = getOutputPath(context); return new DestroyerFileOutputCommitter(output, context); http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java index 92de37e..645dbc6 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java @@ -27,21 +27,22 @@ import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Mapper; import org.apache.log4j.Logger; import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.connector.idf.IntermediateDataFormat; import org.apache.sqoop.job.JobConstants; import org.apache.sqoop.job.MapreduceExecutionError; import org.apache.sqoop.job.PrefixContext; import org.apache.sqoop.job.etl.Extractor; import org.apache.sqoop.job.etl.ExtractorContext; -import org.apache.sqoop.job.io.Data; import org.apache.sqoop.etl.io.DataWriter; import org.apache.sqoop.schema.Schema; +import org.apache.sqoop.job.io.SqoopWritable; import org.apache.sqoop.submission.counter.SqoopCounters; import org.apache.sqoop.utils.ClassUtils; /** * A mapper to perform map function. */ -public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, Data, NullWritable> { +public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable, NullWritable> { static { ConfigurationUtils.configureLogging(); @@ -52,6 +53,8 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, Data, NullWrit * Service for reporting progress to mapreduce. */ private final ScheduledExecutorService progressService = Executors.newSingleThreadScheduledExecutor(); + private IntermediateDataFormat data = null; + private SqoopWritable dataOut = null; @Override public void run(Context context) throws IOException, InterruptedException { @@ -60,6 +63,12 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, Data, NullWrit String extractorName = conf.get(JobConstants.JOB_ETL_EXTRACTOR); Extractor extractor = (Extractor) ClassUtils.instantiate(extractorName); + String intermediateDataFormatName = conf.get(JobConstants + .INTERMEDIATE_DATA_FORMAT); + data = (IntermediateDataFormat) ClassUtils.instantiate(intermediateDataFormatName); + data.setSchema(ConfigurationUtils.getConnectorSchema(conf)); + dataOut = new SqoopWritable(); + // Objects that should be pass to the Executor execution PrefixContext subContext = null; Object configConnection = null; @@ -109,46 +118,38 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, Data, NullWrit } } - public class MapDataWriter extends DataWriter { + private class MapDataWriter extends DataWriter { private Context context; - private Data data; public MapDataWriter(Context context) { this.context = context; } @Override - public void setFieldDelimiter(char fieldDelimiter) { - if (data == null) { - data = new Data(); - } - - data.setFieldDelimiter(fieldDelimiter); - } - - @Override public void writeArrayRecord(Object[] array) { - writeContent(array, Data.ARRAY_RECORD); + data.setObjectData(array); + writeContent(); } @Override - public void writeCsvRecord(String csv) { - writeContent(csv, Data.CSV_RECORD); + public void writeStringRecord(String text) { + data.setTextData(text); + writeContent(); } @Override - public void writeContent(Object content, int type) { - if (data == null) { - data = new Data(); - } + public void writeRecord(Object obj) { + data.setData(obj.toString()); + writeContent(); + } - data.setContent(content, type); + private void writeContent() { try { - context.write(data, NullWritable.get()); + dataOut.setString(data.getTextData()); + context.write(dataOut, NullWritable.get()); } catch (Exception e) { throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0013, e); } } } - } http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java index 90de6ef..b3461bb 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java @@ -28,14 +28,14 @@ import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.log4j.Logger; import org.apache.sqoop.job.JobConstants; -import org.apache.sqoop.job.io.Data; +import org.apache.sqoop.job.io.SqoopWritable; import java.io.IOException; /** * An output format for MapReduce job. */ -public class SqoopNullOutputFormat extends OutputFormat<Data, NullWritable> { +public class SqoopNullOutputFormat extends OutputFormat<SqoopWritable, NullWritable> { public static final Logger LOG = Logger.getLogger(SqoopNullOutputFormat.class); @@ -46,7 +46,7 @@ public class SqoopNullOutputFormat extends OutputFormat<Data, NullWritable> { } @Override - public RecordWriter<Data, NullWritable> getRecordWriter( + public RecordWriter<SqoopWritable, NullWritable> getRecordWriter( TaskAttemptContext context) { SqoopOutputFormatLoadExecutor executor = new SqoopOutputFormatLoadExecutor(context); http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java index 7dedee9..6efadf6 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java @@ -31,14 +31,16 @@ import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.log4j.Logger; import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; +import org.apache.sqoop.connector.idf.IntermediateDataFormat; import org.apache.sqoop.job.JobConstants; import org.apache.sqoop.job.MapreduceExecutionError; import org.apache.sqoop.job.PrefixContext; import org.apache.sqoop.job.etl.Loader; import org.apache.sqoop.job.etl.LoaderContext; -import org.apache.sqoop.job.io.Data; import org.apache.sqoop.etl.io.DataReader; import org.apache.sqoop.schema.Schema; +import org.apache.sqoop.job.io.SqoopWritable; import org.apache.sqoop.utils.ClassUtils; public class SqoopOutputFormatLoadExecutor { @@ -48,7 +50,7 @@ public class SqoopOutputFormatLoadExecutor { private volatile boolean readerFinished = false; private volatile boolean writerFinished = false; - private volatile Data data; + private volatile IntermediateDataFormat data; private JobContext context; private SqoopRecordWriter producer; private Future<?> consumerFuture; @@ -60,17 +62,19 @@ public class SqoopOutputFormatLoadExecutor { SqoopOutputFormatLoadExecutor(boolean isTest, String loaderName){ this.isTest = isTest; this.loaderName = loaderName; - data = new Data(); + data = new CSVIntermediateDataFormat(); producer = new SqoopRecordWriter(); } public SqoopOutputFormatLoadExecutor(JobContext jobctx) { - data = new Data(); context = jobctx; producer = new SqoopRecordWriter(); + data = (IntermediateDataFormat) ClassUtils.instantiate(context + .getConfiguration().get(JobConstants.INTERMEDIATE_DATA_FORMAT)); + data.setSchema(ConfigurationUtils.getConnectorSchema(context.getConfiguration())); } - public RecordWriter<Data, NullWritable> getRecordWriter() { + public RecordWriter<SqoopWritable, NullWritable> getRecordWriter() { consumerFuture = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat ("OutputFormatLoader-consumer").build()).submit( new ConsumerThread()); @@ -81,14 +85,13 @@ public class SqoopOutputFormatLoadExecutor { * This is a producer-consumer problem and can be solved * with two semaphores. */ - private class SqoopRecordWriter extends RecordWriter<Data, NullWritable> { + private class SqoopRecordWriter extends RecordWriter<SqoopWritable, NullWritable> { @Override - public void write(Data key, NullWritable value) throws InterruptedException { + public void write(SqoopWritable key, NullWritable value) throws InterruptedException { free.acquire(); checkIfConsumerThrew(); - int type = key.getType(); - data.setContent(key.getContent(type), type); + data.setTextData(key.getString()); filled.release(); } @@ -135,48 +138,68 @@ public class SqoopOutputFormatLoadExecutor { } private class OutputFormatDataReader extends DataReader { - @Override - public void setFieldDelimiter(char fieldDelimiter) { - data.setFieldDelimiter(fieldDelimiter); - } @Override public Object[] readArrayRecord() throws InterruptedException { - return (Object[])readContent(Data.ARRAY_RECORD); + acquireSema(); + // If the writer has finished, there is definitely no data remaining + if (writerFinished) { + return null; + } + try { + return data.getObjectData(); + } finally { + releaseSema(); + } } @Override - public String readCsvRecord() throws InterruptedException { - return (String)readContent(Data.CSV_RECORD); + public String readTextRecord() throws InterruptedException { + acquireSema(); + // If the writer has finished, there is definitely no data remaining + if (writerFinished) { + return null; + } + try { + return data.getTextData(); + } finally { + releaseSema(); + } } @Override - public Object readContent(int type) throws InterruptedException { - // Has any more data been produced after I last consumed. - // If no, wait for the producer to produce. - try { - filled.acquire(); - } catch (InterruptedException ex) { - //Really at this point, there is nothing to do. Just throw and get out - LOG.error("Interrupted while waiting for data to be available from " + - "mapper", ex); - throw ex; - } - // If the writer has finished, there is definitely no data remaining + public Object readContent() throws InterruptedException { + acquireSema(); if (writerFinished) { return null; } try { - Object content = data.getContent(type); - return content; + return data.getData(); } catch (Throwable t) { readerFinished = true; LOG.error("Caught exception e while getting content ", t); throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0018, t); } finally { - free.release(); + releaseSema(); + } + } + + private void acquireSema() throws InterruptedException { + // Has any more data been produced after I last consumed. + // If no, wait for the producer to produce. + try { + filled.acquire(); + } catch (InterruptedException ex) { + //Really at this point, there is nothing to do. Just throw and get out + LOG.error("Interrupted while waiting for data to be available from " + + "mapper", ex); + throw ex; } } + + private void releaseSema(){ + free.release(); + } } private class ConsumerThread implements Runnable { http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java index 98a2c51..a55534a 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java @@ -20,7 +20,7 @@ package org.apache.sqoop.job.mr; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Reducer; import org.apache.log4j.Logger; -import org.apache.sqoop.job.io.Data; +import org.apache.sqoop.job.io.SqoopWritable; import java.io.IOException; import java.util.concurrent.Executors; @@ -30,7 +30,7 @@ import java.util.concurrent.TimeUnit; /** * A reducer to perform reduce function. */ -public class SqoopReducer extends Reducer<Data, NullWritable, Data, NullWritable> { +public class SqoopReducer extends Reducer<SqoopWritable, NullWritable, SqoopWritable, NullWritable> { static { ConfigurationUtils.configureLogging(); http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/execution/mapreduce/src/test/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngineTest.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngineTest.java b/execution/mapreduce/src/test/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngineTest.java index 39d1b53..a849394 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngineTest.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngineTest.java @@ -18,6 +18,7 @@ package org.apache.sqoop.execution.mapreduce; import org.apache.sqoop.common.MutableMapContext; +import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; import org.apache.sqoop.framework.SubmissionRequest; import org.apache.sqoop.framework.configuration.ImportJobConfiguration; import org.apache.sqoop.framework.configuration.OutputCompression; @@ -71,6 +72,7 @@ public class MapreduceExecutionEngineTest { request.setConnectorCallbacks(new Importer(Initializer.class, Partitioner.class, Extractor.class, Destroyer.class) { }); + request.setIntermediateDataFormat(CSVIntermediateDataFormat.class); executionEngine.prepareImportSubmission(request); MutableMapContext context = request.getFrameworkContext(); @@ -97,6 +99,7 @@ public class MapreduceExecutionEngineTest { request.setConnectorCallbacks(new Importer(Initializer.class, Partitioner.class, Extractor.class, Destroyer.class) { }); + request.setIntermediateDataFormat(CSVIntermediateDataFormat.class); executionEngine.prepareImportSubmission(request); MutableMapContext context = request.getFrameworkContext(); http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java index e21f15b..09e5ec5 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java @@ -27,7 +27,7 @@ import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.sqoop.job.io.Data; +import org.apache.sqoop.job.io.SqoopWritable; import org.apache.sqoop.job.mr.SqoopFileOutputFormat; import org.apache.sqoop.job.mr.SqoopInputFormat; import org.apache.sqoop.job.mr.SqoopMapper; @@ -44,17 +44,17 @@ public class JobUtils { } public static void runJob(Configuration conf, - Class<? extends InputFormat<SqoopSplit, NullWritable>> input, - Class<? extends Mapper<SqoopSplit, NullWritable, Data, NullWritable>> mapper, - Class<? extends OutputFormat<Data, NullWritable>> output) - throws IOException, InterruptedException, ClassNotFoundException { + Class<? extends InputFormat<SqoopSplit, NullWritable>> input, + Class<? extends Mapper<SqoopSplit, NullWritable, SqoopWritable, NullWritable>> mapper, + Class<? extends OutputFormat<SqoopWritable, NullWritable>> output) + throws IOException, InterruptedException, ClassNotFoundException { Job job = new Job(conf); job.setInputFormatClass(input); job.setMapperClass(mapper); - job.setMapOutputKeyClass(Data.class); + job.setMapOutputKeyClass(SqoopWritable.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputFormatClass(output); - job.setOutputKeyClass(Data.class); + job.setOutputKeyClass(SqoopWritable.class); job.setOutputValueClass(NullWritable.class); boolean success = job.waitForCompletion(true);
