Updated Branches: refs/heads/trunk ead68ad37 -> b2d3a682a
SQOOP-1030: Enable column types Binary and Varbinary parsing in Sqoop for export (Shuaishuai Nie via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/b2d3a682 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/b2d3a682 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/b2d3a682 Branch: refs/heads/trunk Commit: b2d3a682adae553902897466555e2766ae665619 Parents: ead68ad Author: Jarek Jarcec Cecho <[email protected]> Authored: Thu May 23 01:37:10 2013 -0700 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Thu May 23 01:37:10 2013 -0700 ---------------------------------------------------------------------- src/java/org/apache/sqoop/orm/ClassWriter.java | 9 ++ .../manager/SQLServerManagerExportManualTest.java | 109 +++++++++++++++ 2 files changed, 118 insertions(+), 0 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/b2d3a682/src/java/org/apache/sqoop/orm/ClassWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/orm/ClassWriter.java b/src/java/org/apache/sqoop/orm/ClassWriter.java index 0202f7f..1bd2a41 100644 --- a/src/java/org/apache/sqoop/orm/ClassWriter.java +++ b/src/java/org/apache/sqoop/orm/ClassWriter.java @@ -966,6 +966,15 @@ public class ClassWriter { sb.append(" this." + colName + " = ClobRef.parse(__cur_str);\n"); } else if (javaType.equals(BlobRef.class.getName())) { sb.append(" this." + colName + " = BlobRef.parse(__cur_str);\n"); + } else if (javaType.equals(BytesWritable.class.getName())) { + // Get the unsigned byte[] from the hex string representation + // We cannot use Byte.parse() which always assumes a signed decimal byte + sb.append(" String[] strByteVal = __cur_str.trim().split(\" \");\n"); + sb.append(" byte [] byteVal = new byte[strByteVal.length];\n"); + sb.append(" for (int i = 0; i < byteVal.length; ++i) {\n"); + sb.append(" byteVal[i] = (byte)Integer.parseInt(strByteVal[i], 16);\n"); + sb.append(" }\n"); + sb.append(" this." + colName + " = new BytesWritable(byteVal);\n"); } else { LOG.error("No parser available for Java type " + javaType); } http://git-wip-us.apache.org/repos/asf/sqoop/blob/b2d3a682/src/test/com/cloudera/sqoop/manager/SQLServerManagerExportManualTest.java ---------------------------------------------------------------------- diff --git a/src/test/com/cloudera/sqoop/manager/SQLServerManagerExportManualTest.java b/src/test/com/cloudera/sqoop/manager/SQLServerManagerExportManualTest.java index 7800944..1d4534b 100644 --- a/src/test/com/cloudera/sqoop/manager/SQLServerManagerExportManualTest.java +++ b/src/test/com/cloudera/sqoop/manager/SQLServerManagerExportManualTest.java @@ -55,6 +55,7 @@ public class SQLServerManagerExportManualTest extends ExportJobTestCase { static final String DATABASE_PASSWORD = "PASSWORD"; static final String SCHEMA_DBO = "dbo"; static final String DBO_TABLE_NAME = "EMPLOYEES_MSSQL"; + static final String DBO_BINARY_TABLE_NAME = "BINARYTYPE_MSSQL"; static final String SCHEMA_SCH = "sch"; static final String SCH_TABLE_NAME = "PRIVATE_TABLE"; static final String CONNECT_STRING = HOST_URL @@ -171,6 +172,75 @@ public class SQLServerManagerExportManualTest extends ExportJobTestCase { } } + public void createSQLServerBinaryTypeTable(String schema, String table) { + String fulltableName = manager.escapeObjectName(schema) + + "." + manager.escapeObjectName(table); + + Statement stmt = null; + + // Create schema if needed + try { + conn = manager.getConnection(); + stmt = conn.createStatement(); + stmt.execute("CREATE SCHEMA " + schema); + conn.commit(); + } catch (SQLException sqlE) { + LOG.info("Can't create schema: " + sqlE.getMessage()); + } finally { + try { + if (null != stmt) { + stmt.close(); + } + } catch (Exception ex) { + LOG.warn("Exception while closing stmt", ex); + } + } + + // Drop the existing table, if there is one. + try { + conn = manager.getConnection(); + stmt = conn.createStatement(); + stmt.execute("DROP TABLE " + fulltableName); + conn.commit(); + } catch (SQLException sqlE) { + LOG.info("Table was not dropped: " + sqlE.getMessage()); + } finally { + try { + if (null != stmt) { + stmt.close(); + } + } catch (Exception ex) { + LOG.warn("Exception while closing stmt", ex); + } + } + + // Create and populate table + try { + conn = manager.getConnection(); + conn.setAutoCommit(false); + stmt = conn.createStatement(); + + // create the database table and populate it with data. + stmt.executeUpdate("CREATE TABLE " + fulltableName + " (" + + "id INT PRIMARY KEY, " + + "b1 BINARY(10), " + + "b2 VARBINARY(10))"); + conn.commit(); + } catch (SQLException sqlE) { + LOG.error("Encountered SQL Exception: ", sqlE); + sqlE.printStackTrace(); + fail("SQLException when running test setUp(): " + sqlE); + } finally { + try { + if (null != stmt) { + stmt.close(); + } + } catch (Exception ex) { + LOG.warn("Exception while closing connection/stmt", ex); + } + } + } + @After public void tearDown() { super.tearDown(); @@ -283,6 +353,45 @@ public class SQLServerManagerExportManualTest extends ExportJobTestCase { assertRowCount(2, escapeObjectName(DBO_TABLE_NAME), conn); } + public void testSQLServerBinaryType() throws IOException, SQLException { + createSQLServerBinaryTypeTable(SCHEMA_DBO, DBO_BINARY_TABLE_NAME); + createTestFile("inputFile", new String[] { + "1,73 65 63 72 65 74 00 00 00 00,73 65 63 72 65 74" + }); + String[] expectedContent = {"73656372657400000000", "736563726574"}; + runExport(getArgv(DBO_BINARY_TABLE_NAME)); + assertRowCount(1, escapeObjectName(DBO_BINARY_TABLE_NAME), conn); + checkSQLBinaryTableContent(expectedContent, escapeObjectName(DBO_BINARY_TABLE_NAME), conn); + } + + public static void checkSQLBinaryTableContent(String[] expected, String tableName, Connection connection){ + Statement stmt = null; + ResultSet rs = null; + try { + stmt = connection.createStatement(); + rs = stmt.executeQuery("SELECT TOP 1 [b1], [b2] FROM " + tableName); + rs.next(); + assertEquals(expected[0], rs.getString("b1")); + assertEquals(expected[1], rs.getString("b2")); + } catch (SQLException e) { + LOG.error("Can't verify table content", e); + fail(); + } finally { + try { + connection.commit(); + + if (stmt != null) { + stmt.close(); + } + if (rs != null) { + rs.close(); + } + } catch (SQLException ex) { + LOG.info("Ignored exception in finally block."); + } + } + } + public static void assertRowCount(long expected, String tableName, Connection connection) {
