Repository: sqoop Updated Branches: refs/heads/trunk 92e2f9992 -> d2bdef496
SQOOP-3139: sqoop tries to re execute select query during import in case of a connection reset error and this is causing lots of duplicate records from source (Zoltan Toth via Anna Szonyi) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/d2bdef49 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/d2bdef49 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/d2bdef49 Branch: refs/heads/trunk Commit: d2bdef49669c400691d35158562dab9adc12a527 Parents: 92e2f99 Author: Anna Szonyi <[email protected]> Authored: Tue Aug 22 10:58:40 2017 +0200 Committer: Anna Szonyi <[email protected]> Committed: Tue Aug 22 10:58:40 2017 +0200 ---------------------------------------------------------------------- .../sqoop/mapreduce/db/DBRecordReader.java | 4 + .../mapreduce/db/SQLServerDBRecordReader.java | 57 ++++- .../db/TestSQLServerDBRecordReader.java | 214 +++++++++++++++++++ 3 files changed, 264 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/d2bdef49/src/java/org/apache/sqoop/mapreduce/db/DBRecordReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/db/DBRecordReader.java b/src/java/org/apache/sqoop/mapreduce/db/DBRecordReader.java index a78eb06..eed5780 100644 --- a/src/java/org/apache/sqoop/mapreduce/db/DBRecordReader.java +++ b/src/java/org/apache/sqoop/mapreduce/db/DBRecordReader.java @@ -332,4 +332,8 @@ public class DBRecordReader<T extends DBWritable> extends protected Configuration getConf(){ return conf; } + + ResultSet getResultSet() { + return results; + } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/d2bdef49/src/java/org/apache/sqoop/mapreduce/db/SQLServerDBRecordReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/db/SQLServerDBRecordReader.java b/src/java/org/apache/sqoop/mapreduce/db/SQLServerDBRecordReader.java index 9a3621b..1dea842 100644 --- a/src/java/org/apache/sqoop/mapreduce/db/SQLServerDBRecordReader.java +++ b/src/java/org/apache/sqoop/mapreduce/db/SQLServerDBRecordReader.java @@ -20,6 +20,7 @@ package org.apache.sqoop.mapreduce.db; import java.io.IOException; import java.sql.Connection; import java.sql.SQLException; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -33,7 +34,7 @@ import com.cloudera.sqoop.mapreduce.db.DBConfiguration; import com.cloudera.sqoop.mapreduce.db.DBInputFormat; import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat; -import org.apache.sqoop.lib.SqoopRecord; +import org.apache.sqoop.lib.SqoopRecord; /** * A RecordReader that reads records from a SQL table. @@ -41,7 +42,7 @@ import org.apache.sqoop.lib.SqoopRecord; * connection failure handler */ public class SQLServerDBRecordReader<T extends SqoopRecord> extends - SqlServerRecordReader<T> { + SqlServerRecordReader<T> { private static final Log LOG = LogFactory.getLog(SQLServerDBRecordReader.class); @@ -55,7 +56,7 @@ public class SQLServerDBRecordReader<T extends SqoopRecord> extends // Name of the split column used to re-generate selectQueries after // connection failures private String splitColumn; - private String lastRecordKey; + private String lastRecordValue; public SQLServerDBRecordReader(DBInputFormat.DBInputSplit split, Class<T> inputClass, Configuration conf, Connection conn, @@ -67,15 +68,48 @@ public class SQLServerDBRecordReader<T extends SqoopRecord> extends @Override /** {@inheritDoc} */ public T getCurrentValue() { - T val = super.getCurrentValue(); - // Lookup the key of the last read record to use for recovering - // As documented, the map may not be null, though it may be empty. - Object lastRecordSplitCol = val.getFieldMap().get(splitColumn); - lastRecordKey = (lastRecordSplitCol == null) ? null - : lastRecordSplitCol.toString(); + T val = currentValue(); + + saveCurrentValue(val); + return val; } + T currentValue() { + return super.getCurrentValue(); + } + + void saveCurrentValue(T value) { + lastRecordValue = getCurrentValueOfSplitByColumnFromORM(value, splitColumn); + } + + private String getCurrentValueOfSplitByColumnFromORM(T generatedORMRecord, String columnName) { + Object result = generatedORMRecord.getFieldMap().get(columnName); + if (result != null) { + return result.toString(); + } + return getCurrentValueOfSplitByColumnFromORMIfSplitByDoesNotMatch(generatedORMRecord, columnName); + } + + /* + * SQOOP-3139: It is a workaround if the database/table/column is used in case insensitive mode and the user + * uses Sqoop import with --split-by but the given parameter doesn't match with table name if it is case sensitive + * eg.: tableName.equals(split-by) doesn't match only if tableName.equalsIgnorecase(split-by) + * + */ + private String getCurrentValueOfSplitByColumnFromORMIfSplitByDoesNotMatch(T generatedORMRecord, String columnName) { + for (Map.Entry<String, Object> fields : generatedORMRecord.getFieldMap().entrySet()) { + if (columnName.equalsIgnoreCase(fields.getKey())) { + return fields.getValue() != null ? fields.getValue().toString() : null; + } + } + return null; + } + + String getLastRecordValue() { + return lastRecordValue; + } + /** * Load the SQLFailureHandler configured for use by the record reader. */ @@ -202,7 +236,7 @@ public class SQLServerDBRecordReader<T extends SqoopRecord> extends // Last seen record key is only expected to be unavailable if no reads // ever happened String selectQuery; - if (lastRecordKey == null) { + if (lastRecordValue == null) { selectQuery = super.getSelectQuery(); } else { // If last record key is available, construct the select query to start @@ -212,7 +246,7 @@ public class SQLServerDBRecordReader<T extends SqoopRecord> extends StringBuilder lowerClause = new StringBuilder(); lowerClause.append(getDBConf().getInputOrderBy()); lowerClause.append(" > "); - lowerClause.append(lastRecordKey.toString()); + lowerClause.append(lastRecordValue.toString()); // Get the select query with the lowerClause, and split upper clause selectQuery = getSelectQuery(lowerClause.toString(), @@ -221,4 +255,5 @@ public class SQLServerDBRecordReader<T extends SqoopRecord> extends return selectQuery; } + } http://git-wip-us.apache.org/repos/asf/sqoop/blob/d2bdef49/src/test/org/apache/sqoop/mapreduce/db/TestSQLServerDBRecordReader.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/mapreduce/db/TestSQLServerDBRecordReader.java b/src/test/org/apache/sqoop/mapreduce/db/TestSQLServerDBRecordReader.java new file mode 100644 index 0000000..fc04a90 --- /dev/null +++ b/src/test/org/apache/sqoop/mapreduce/db/TestSQLServerDBRecordReader.java @@ -0,0 +1,214 @@ +package org.apache.sqoop.mapreduce.db; + +import com.cloudera.sqoop.lib.DelimiterSet; +import com.cloudera.sqoop.lib.LargeObjectLoader; +import com.cloudera.sqoop.lib.RecordParser; +import com.cloudera.sqoop.mapreduce.db.DBConfiguration; +import com.cloudera.sqoop.mapreduce.db.DBInputFormat; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.sqoop.lib.SqoopRecord; +import org.junit.Before; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +public class TestSQLServerDBRecordReader { + + private static final String SPLIT_BY_COLUMN = "myCol"; + private static final String COL_NAME_SAME_AS_SPLIT_BY = SPLIT_BY_COLUMN; + private static final String UPPERCASE_COL_NAME = SPLIT_BY_COLUMN.toUpperCase(); + private static final String ANY_VALUE_FOR_COL = "Value"; + private static final String NULL_VALUE_FOR_COL = null; + + private SQLServerDBRecordReader reader; + + @Before + public void before() throws Exception { + DBInputFormat.DBInputSplit split = mock(DBInputFormat.DBInputSplit.class); + Configuration conf = new Configuration(); + conf.set(SQLServerDBInputFormat.IMPORT_FAILURE_HANDLER_CLASS, SQLFailureHandlerStub.class.getName()); + Connection connection = mock(Connection.class); + DBConfiguration dbConfiguration = mock(DBConfiguration.class); + when(dbConfiguration.getInputOrderBy()).thenReturn(SPLIT_BY_COLUMN); + + reader = spy(new SQLServerDBRecordReader(split, SqlTableClassStub.class, conf, connection, dbConfiguration, "", new String[]{}, "", "")); + + + doAnswer(new Answer<String>() { + @Override + public String answer(InvocationOnMock invocationOnMock) throws Throwable { + return StringUtils.EMPTY; + } + }).when(reader).getSelectQuery(); + + doAnswer(new Answer<ResultSet>() { + @Override + public ResultSet answer(InvocationOnMock invocationOnMock) throws Throwable { + return mock(ResultSet.class); + } + }).when(reader).executeQuery(anyString()); + + reader.initialize(mock(InputSplit.class), mock(TaskAttemptContext.class)); + + } + + @Test + public void returnNullIfTheLastRecordValueIsNull() { + when(reader.currentValue()).thenReturn(new SqlTableClassStub(COL_NAME_SAME_AS_SPLIT_BY, NULL_VALUE_FOR_COL)); + reader.getCurrentValue(); + assertEquals(NULL_VALUE_FOR_COL, reader.getLastRecordValue()); + } + + @Test + public void returnNullIfTheLastRecordValueIsNullAndColumnNameIsDifferent() { + when(reader.currentValue()).thenReturn(new SqlTableClassStub(UPPERCASE_COL_NAME, NULL_VALUE_FOR_COL)); + reader.getCurrentValue(); + assertEquals(NULL_VALUE_FOR_COL, reader.getLastRecordValue()); + } + + @Test + public void returnLastSavedValueWhenColumNameIsTheSameSplitByColumn() { + when(reader.currentValue()).thenReturn(new SqlTableClassStub(COL_NAME_SAME_AS_SPLIT_BY, ANY_VALUE_FOR_COL)); + reader.getCurrentValue(); + + assertEquals(ANY_VALUE_FOR_COL, reader.getLastRecordValue()); + } + + /* + * This test intended to test if the table name and query parameter wouldn't + * match (eg.: mycol, MyCol) if the DB is case insensitive + */ + @Test + public void returnLastSavedValueWhenColumnNameDifferentFromSplitByColumn() { + when(reader.currentValue()).thenReturn(new SqlTableClassStub(UPPERCASE_COL_NAME, ANY_VALUE_FOR_COL)); + reader.getCurrentValue(); + + assertEquals(ANY_VALUE_FOR_COL, reader.getLastRecordValue()); + } + + private static class SqlTableClassStub extends SqoopRecord { + private String colName; + private String colValue; + + public SqlTableClassStub(String colName, String colValue) { + this.colName = colName; + this.colValue = colValue; + } + + @Override + public Map<String, Object> getFieldMap() { + return new HashMap<String, Object>() {{ + put(colName, colValue); + }}; + } + + @Override + public void parse(CharSequence s) throws RecordParser.ParseError { + + } + + @Override + public void parse(Text s) throws RecordParser.ParseError { + + } + + @Override + public void parse(byte[] s) throws RecordParser.ParseError { + + } + + @Override + public void parse(char[] s) throws RecordParser.ParseError { + + } + + @Override + public void parse(ByteBuffer s) throws RecordParser.ParseError { + + } + + @Override + public void parse(CharBuffer s) throws RecordParser.ParseError { + + } + + @Override + public void loadLargeObjects(LargeObjectLoader objLoader) throws SQLException, IOException, InterruptedException { + + } + + @Override + public int write(PreparedStatement stmt, int offset) throws SQLException { + return 0; + } + + @Override + public String toString(DelimiterSet delimiters) { + return null; + } + + @Override + public int getClassFormatVersion() { + return 0; + } + + @Override + public void write(DataOutput dataOutput) throws IOException { + + } + + @Override + public void readFields(DataInput dataInput) throws IOException { + + } + + @Override + public void write(PreparedStatement statement) throws SQLException { + + } + + @Override + public void readFields(ResultSet resultSet) throws SQLException { + + } + + + } + + private static class SQLFailureHandlerStub extends SQLFailureHandler { + + @Override + public boolean canHandleFailure(Throwable failureCause) { + return false; + } + + @Override + public Connection recover() throws IOException { + return null; + } + } + +} \ No newline at end of file
