Repository: sqoop Updated Branches: refs/heads/sqoop2 7e092f5bf -> cea627fa1
SQOOP-2797: Sqoop2: Add new schema object for the Blob (Colin Ma via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/cea627fa Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/cea627fa Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/cea627fa Branch: refs/heads/sqoop2 Commit: cea627fa1465a4a900c8ad02934df568f93f45b1 Parents: 7e092f5 Author: Jarek Jarcec Cecho <[email protected]> Authored: Wed Feb 3 21:30:57 2016 -0800 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Wed Feb 3 21:30:57 2016 -0800 ---------------------------------------------------------------------- .../common/test/asserts/ProviderAsserts.java | 16 ++++++-- .../sqoop/common/test/db/DatabaseProvider.java | 18 ++++++--- .../sqoop/common/test/db/DerbyProvider.java | 2 +- .../common/test/db/types/DerbyTypeList.java | 9 ++++- .../sqoop/json/util/SchemaSerialization.java | 4 ++ .../java/org/apache/sqoop/schema/type/Blob.java | 39 ++++++++++++++++++++ .../apache/sqoop/schema/type/ColumnType.java | 1 + .../connector/jdbc/GenericJdbcExtractor.java | 8 +++- .../connector/jdbc/util/SqlTypesUtils.java | 5 ++- .../sqoop/connector/common/SqoopIDFUtils.java | 2 + 10 files changed, 90 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/cea627fa/common-test/src/main/java/org/apache/sqoop/common/test/asserts/ProviderAsserts.java ---------------------------------------------------------------------- diff --git a/common-test/src/main/java/org/apache/sqoop/common/test/asserts/ProviderAsserts.java b/common-test/src/main/java/org/apache/sqoop/common/test/asserts/ProviderAsserts.java index ae1b60d..e9cea2d 100644 --- a/common-test/src/main/java/org/apache/sqoop/common/test/asserts/ProviderAsserts.java +++ b/common-test/src/main/java/org/apache/sqoop/common/test/asserts/ProviderAsserts.java @@ -23,8 +23,8 @@ import org.apache.sqoop.common.test.db.TableName; import java.sql.PreparedStatement; import java.sql.ResultSet; +import java.sql.Blob; import java.sql.SQLException; -import java.sql.Statement; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNull; @@ -37,6 +37,7 @@ import static org.testng.Assert.fail; public class ProviderAsserts { private static final Logger LOG = Logger.getLogger(ProviderAsserts.class); + private static final String ERROR_MESSAGE_PRFIX = "Columns do not match on position: "; /** * Assert row in the table. @@ -59,8 +60,11 @@ public class ProviderAsserts { if (expectedValue == null) { assertNull(actualValue); } else { - assertEquals(expectedValue.toString(), actualValue.toString(), - "Columns do not match on position: " + i); + if (expectedValue instanceof Blob) { + assertBlob(rs.getBlob(i), (Blob) expectedValue, i); + } else { + assertEquals(expectedValue.toString(), actualValue.toString(), ERROR_MESSAGE_PRFIX + i); + } } i++; } @@ -74,6 +78,12 @@ public class ProviderAsserts { } } + private static void assertBlob(Blob actualValue, Blob expectedValue, int colPosition) throws SQLException { + byte[] actual = actualValue.getBytes(1, (int)actualValue.length()); + byte[] expected = expectedValue.getBytes(1, (int)expectedValue.length()); + assertEquals(actual, expected, ERROR_MESSAGE_PRFIX + colPosition); + } + private ProviderAsserts() { // Instantiation is prohibited } http://git-wip-us.apache.org/repos/asf/sqoop/blob/cea627fa/common-test/src/main/java/org/apache/sqoop/common/test/db/DatabaseProvider.java ---------------------------------------------------------------------- diff --git a/common-test/src/main/java/org/apache/sqoop/common/test/db/DatabaseProvider.java b/common-test/src/main/java/org/apache/sqoop/common/test/db/DatabaseProvider.java index afc5016..f3efa92 100644 --- a/common-test/src/main/java/org/apache/sqoop/common/test/db/DatabaseProvider.java +++ b/common-test/src/main/java/org/apache/sqoop/common/test/db/DatabaseProvider.java @@ -24,7 +24,6 @@ import org.apache.sqoop.common.test.db.types.DefaultTypeList; import java.math.BigDecimal; import java.sql.Connection; -import java.sql.Date; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -32,6 +31,7 @@ import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Statement; import java.sql.Timestamp; +import java.sql.Blob; import java.util.LinkedList; import java.util.List; @@ -157,7 +157,7 @@ abstract public class DatabaseProvider { * * @return */ - public DatabaseTypeList getDatabaseTypes() { + public DatabaseTypeList getDatabaseTypes() throws Exception { return new DefaultTypeList(); } @@ -304,7 +304,9 @@ abstract public class DatabaseProvider { * Return rows that match given conditions. * * @param tableName Table name - * @param conditions Conditions in form of double values - column name and value, for example: "id", 1 or "last_update_date", null + * @param conditions Conditions in form of double values - column name and value, for example: + * "id", 1 or "last_update_date", null. + * For Blob data type, it can't be used as a condition in where clause directly, skip it. * @return PreparedStatement representing the requested query */ public PreparedStatement getRowsPreparedStatement(TableName tableName, Object[] conditions) { @@ -326,9 +328,10 @@ abstract public class DatabaseProvider { throw new RuntimeException("Each odd item should be a string with column name."); } - if(value == null) { + // Blob can't be used in where clause directly, skip the where clause for Blob + if (value == null) { conditionList.add(escapeColumnName((String) columnName) + " IS NULL"); - } else { + } else if (! (value instanceof Blob)) { conditionList.add(escapeColumnName((String) columnName) + " = ?"); } } @@ -340,7 +343,8 @@ abstract public class DatabaseProvider { PreparedStatement preparedStatement = getConnection().prepareStatement(sb.toString()); for(int i = 1; i < conditions.length; i += 2) { Object value = conditions[i]; - if (value != null) { + // skip the Blob data type + if (value != null && ! (value instanceof Blob)) { insertObjectIntoPreparedStatement(preparedStatement, i, value); } } @@ -374,6 +378,8 @@ abstract public class DatabaseProvider { preparedStatement.setTimestamp(parameterIndex, (Timestamp) value); } else if (value instanceof BigDecimal) { preparedStatement.setBigDecimal(parameterIndex, (BigDecimal) value); + } else if (value instanceof Blob) { + preparedStatement.setBlob(parameterIndex, (Blob) value); } else { preparedStatement.setObject(parameterIndex, value); } http://git-wip-us.apache.org/repos/asf/sqoop/blob/cea627fa/common-test/src/main/java/org/apache/sqoop/common/test/db/DerbyProvider.java ---------------------------------------------------------------------- diff --git a/common-test/src/main/java/org/apache/sqoop/common/test/db/DerbyProvider.java b/common-test/src/main/java/org/apache/sqoop/common/test/db/DerbyProvider.java index 8f3e434..839e561 100644 --- a/common-test/src/main/java/org/apache/sqoop/common/test/db/DerbyProvider.java +++ b/common-test/src/main/java/org/apache/sqoop/common/test/db/DerbyProvider.java @@ -169,7 +169,7 @@ public class DerbyProvider extends DatabaseProvider { } @Override - public DatabaseTypeList getDatabaseTypes() { + public DatabaseTypeList getDatabaseTypes() throws Exception { return new DerbyTypeList(); } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/cea627fa/common-test/src/main/java/org/apache/sqoop/common/test/db/types/DerbyTypeList.java ---------------------------------------------------------------------- diff --git a/common-test/src/main/java/org/apache/sqoop/common/test/db/types/DerbyTypeList.java b/common-test/src/main/java/org/apache/sqoop/common/test/db/types/DerbyTypeList.java index 642651d..fc02b83 100644 --- a/common-test/src/main/java/org/apache/sqoop/common/test/db/types/DerbyTypeList.java +++ b/common-test/src/main/java/org/apache/sqoop/common/test/db/types/DerbyTypeList.java @@ -17,14 +17,17 @@ */ package org.apache.sqoop.common.test.db.types; +import javax.sql.rowset.serial.SerialBlob; +import java.io.UnsupportedEncodingException; import java.math.BigDecimal; import java.math.RoundingMode; +import java.sql.SQLException; /** * Source: https://db.apache.org/derby/docs/10.7/ref/crefsqlj31068.html */ public class DerbyTypeList extends DatabaseTypeList { - public DerbyTypeList() { + public DerbyTypeList() throws SQLException, UnsupportedEncodingException { super(); // Numeric types @@ -106,6 +109,10 @@ public class DerbyTypeList extends DatabaseTypeList { .build()); // BLOB + add(DatabaseType.builder("BLOB(1K)") + .addExample("", new SerialBlob("test data".getBytes("ISO-8859-1")), "'test data'") + .build()); + // CLOB // Time // Timestamp http://git-wip-us.apache.org/repos/asf/sqoop/blob/cea627fa/common/src/main/java/org/apache/sqoop/json/util/SchemaSerialization.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/sqoop/json/util/SchemaSerialization.java b/common/src/main/java/org/apache/sqoop/json/util/SchemaSerialization.java index 3a3f9e8..ee385c0 100644 --- a/common/src/main/java/org/apache/sqoop/json/util/SchemaSerialization.java +++ b/common/src/main/java/org/apache/sqoop/json/util/SchemaSerialization.java @@ -31,6 +31,7 @@ import org.apache.sqoop.schema.type.AbstractString; import org.apache.sqoop.schema.type.Array; import org.apache.sqoop.schema.type.Binary; import org.apache.sqoop.schema.type.Bit; +import org.apache.sqoop.schema.type.Blob; import org.apache.sqoop.schema.type.Column; import org.apache.sqoop.schema.type.ColumnType; import org.apache.sqoop.schema.type.Date; @@ -238,6 +239,9 @@ public class SchemaSerialization { case BIT: output = new Bit(name); break; + case BLOB: + output = new Blob(name); + break; case DATE: output = new Date(name); break; http://git-wip-us.apache.org/repos/asf/sqoop/blob/cea627fa/common/src/main/java/org/apache/sqoop/schema/type/Blob.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/sqoop/schema/type/Blob.java b/common/src/main/java/org/apache/sqoop/schema/type/Blob.java new file mode 100644 index 0000000..17d5e6b --- /dev/null +++ b/common/src/main/java/org/apache/sqoop/schema/type/Blob.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.schema.type; + +public class Blob extends Binary { + + public Blob(String name) { + super(name); + } + + @Override + public ColumnType getType() { + return ColumnType.BLOB; + } + + @Override + public String toString() { + return new StringBuilder("Blob{") + .append(super.toString()) + .append("}") + .toString(); + } + +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/cea627fa/common/src/main/java/org/apache/sqoop/schema/type/ColumnType.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/sqoop/schema/type/ColumnType.java b/common/src/main/java/org/apache/sqoop/schema/type/ColumnType.java index 9e415bf..ac98ee8 100644 --- a/common/src/main/java/org/apache/sqoop/schema/type/ColumnType.java +++ b/common/src/main/java/org/apache/sqoop/schema/type/ColumnType.java @@ -29,6 +29,7 @@ public enum ColumnType { ARRAY, BINARY, BIT, + BLOB, DATE, DATE_TIME, DECIMAL, http://git-wip-us.apache.org/repos/asf/sqoop/blob/cea627fa/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExtractor.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExtractor.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExtractor.java index 0235f28..41af177 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExtractor.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExtractor.java @@ -21,7 +21,7 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; -import java.sql.Statement; +import java.sql.Blob; import org.apache.log4j.Logger; import org.apache.sqoop.common.SqoopException; @@ -68,6 +68,11 @@ public class GenericJdbcExtractor extends Extractor<LinkConfiguration, FromJobCo // check type of the column Column schemaColumn = schemaColumns[i]; switch (schemaColumn.getType()) { + case BLOB: + // convert the blob to byte[] + Blob blob = resultSet.getBlob(i + 1); + array[i] = blob.getBytes(1, (int)blob.length()); + break; case DATE: // convert the sql date to JODA time as prescribed the Sqoop IDF spec array[i] = LocalDate.fromDateFields(resultSet.getDate(i + 1)); @@ -83,7 +88,6 @@ public class GenericJdbcExtractor extends Extractor<LinkConfiguration, FromJobCo default: //for anything else array[i] = resultSet.getObject(i + 1); - } } context.getDataWriter().writeArrayRecord(array); http://git-wip-us.apache.org/repos/asf/sqoop/blob/cea627fa/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/util/SqlTypesUtils.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/util/SqlTypesUtils.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/util/SqlTypesUtils.java index a6ffa7c..f8f9f0d 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/util/SqlTypesUtils.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/util/SqlTypesUtils.java @@ -20,6 +20,7 @@ package org.apache.sqoop.connector.jdbc.util; import org.apache.sqoop.schema.type.Column; import org.apache.sqoop.schema.type.Binary; import org.apache.sqoop.schema.type.Bit; +import org.apache.sqoop.schema.type.Blob; import org.apache.sqoop.schema.type.Date; import org.apache.sqoop.schema.type.DateTime; import org.apache.sqoop.schema.type.Decimal; @@ -88,9 +89,11 @@ public class SqlTypesUtils { case Types.BOOLEAN: return new Bit(columnName); + case Types.BLOB: + return new Blob(columnName); + case Types.BINARY: case Types.VARBINARY: - case Types.BLOB: case Types.LONGVARBINARY: return new Binary(columnName); http://git-wip-us.apache.org/repos/asf/sqoop/blob/cea627fa/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java ---------------------------------------------------------------------- diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java index fc25100..9baa743 100644 --- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java @@ -608,6 +608,7 @@ public class SqoopIDFUtils { csvString.append(toCSVString(objectArray[i].toString())); break; case BINARY: + case BLOB: case UNKNOWN: csvString.append(toCSVByteArray((byte[]) objectArray[i])); break; @@ -714,6 +715,7 @@ public class SqoopIDFUtils { returnValue = toText(csvString); break; case BINARY: + case BLOB: // Unknown is treated as a binary type case UNKNOWN: returnValue = toByteArray(csvString);
