Repository: sqoop Updated Branches: refs/heads/sqoop2 927c72d15 -> ec0544c6f
SQOOP-2443: Sqoop2: Generic JDBC: Properly detect compound primary keys in GenericJdbcExecutor (Jarek Jarcec Cecho via Abraham Elmahrek) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/ec0544c6 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/ec0544c6 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/ec0544c6 Branch: refs/heads/sqoop2 Commit: ec0544c6f5436abeb6880ec2929eb4190dd94069 Parents: 927c72d Author: Abraham Elmahrek <[email protected]> Authored: Wed Aug 12 14:54:54 2015 -0700 Committer: Abraham Elmahrek <[email protected]> Committed: Wed Aug 12 14:54:54 2015 -0700 ---------------------------------------------------------------------- .../error/code/GenericJdbcConnectorError.java | 4 ++ .../connector/jdbc/GenericJdbcExecutor.java | 56 +++++++++++++++++--- .../jdbc/GenericJdbcFromInitializer.java | 10 +++- .../connector/jdbc/GenericJdbcExecutorTest.java | 13 ++++- docs/src/site/sphinx/Connectors.rst | 2 +- 5 files changed, 74 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/ec0544c6/common/src/main/java/org/apache/sqoop/error/code/GenericJdbcConnectorError.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/sqoop/error/code/GenericJdbcConnectorError.java b/common/src/main/java/org/apache/sqoop/error/code/GenericJdbcConnectorError.java index f18acbd..9a9bb66 100644 --- a/common/src/main/java/org/apache/sqoop/error/code/GenericJdbcConnectorError.java +++ b/common/src/main/java/org/apache/sqoop/error/code/GenericJdbcConnectorError.java @@ -89,6 +89,10 @@ public enum GenericJdbcConnectorError implements ErrorCode { GENERIC_JDBC_CONNECTOR_0023("Received error from the database"), + GENERIC_JDBC_CONNECTOR_0024("Multiple tables of the same name in different schema/catalog"), + + GENERIC_JDBC_CONNECTOR_0025("No primary key"), + ; private final String message; http://git-wip-us.apache.org/repos/asf/sqoop/blob/ec0544c6/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java index 1aeca7e..3770e07 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java @@ -38,9 +38,12 @@ import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Statement; import java.sql.Timestamp; +import java.util.AbstractMap; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Properties; +import java.util.Set; /** * Database executor that is based on top of JDBC spec. @@ -366,24 +369,63 @@ public class GenericJdbcExecutor { * * (schema, table) * * (table) * Return value of any combination is "undefined". - * @return Primary key's name + * @return All columns that are consisting of tables primary key (in order) */ - public String getPrimaryKey(String ...identifiers) { + public String[] getPrimaryKey(String ...identifiers) { int index = 0; String catalog = identifiers.length >= 3 ? identifiers[index++] : null; String schema = identifiers.length >= 2 ? identifiers[index++] : null; String table = identifiers[index]; + /* Using the getPrimaryKeys call have few challenges that we're protecting ourselves against here: + * + * 1) Call to getPrimaryKeys() returns columns ordered by COLUMN_NAME and not by KEY_SEQ. Therefore + * we have to manually re-order them in order that make sense to us (e.g. by KEY_SEQ). + * + * 2) If we run the search with catalog and schema arguments set to NULL (e.g. we're searching only + * by table name), we'll get all tables with given name. This is a problem in case that users will have + * the same table name in multiple schemas (or catalogs). As we don't want users to force remembering + * what is the default catalog and schema name for their tables, we've chosen more defensive approach - + * we'll search only by table name and detect if we found two different tables, only in this case we'll + * error out requesting user to specify which schema we need to use. + */ + List<AbstractMap.SimpleEntry<String, Short>> primaryKeyColumns = new LinkedList<>(); + Set<String> catalogNames = new HashSet<>(); + Set<String> schemaNames = new HashSet<>(); + try { - DatabaseMetaData dbmd = connection.getMetaData(); - ResultSet rs = dbmd.getPrimaryKeys(catalog, schema, table); + ResultSet rs = connection.getMetaData().getPrimaryKeys(catalog, schema, table); + assert rs != null; + + // Load data from the getPrimaryKeys() call + while(rs.next()) { + primaryKeyColumns.add(new AbstractMap.SimpleEntry<>( + rs.getString("COLUMN_NAME"), + rs.getShort("KEY_SEQ") + )); + catalogNames.add(rs.getString("TABLE_CAT")); + schemaNames.add(rs.getString("TABLE_SCHEM")); + } - if (rs != null && rs.next()) { - return rs.getString("COLUMN_NAME"); + // Verification + if(catalogNames.size() > 1 || schemaNames.size() > 1) { + throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0024, + "For search (" + catalog + ", " + schema + ", " + table + ") we found the table in catalogs [" + StringUtils.join(catalogNames, ", ") + "] and schemas [" + StringUtils.join(schemaNames, ", ") + "]"); + } - } else { + // Few shortcuts so that we don't have run full loop + if(primaryKeyColumns.isEmpty()) { return null; + } else if(primaryKeyColumns.size() == 1){ + return new String[] {primaryKeyColumns.get(0).getKey()}; + } + + // Properly sort the columns by KEY_SEQ and return result + String [] ret = new String[primaryKeyColumns.size()]; + for(AbstractMap.SimpleEntry<String, Short> entry : primaryKeyColumns) { + ret[entry.getValue() - 1] = entry.getKey(); } + return ret; } catch (SQLException e) { logSQLException(e); http://git-wip-us.apache.org/repos/asf/sqoop/blob/ec0544c6/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java index 9d8e4e7..8bf7b6e 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java @@ -135,7 +135,15 @@ public class GenericJdbcFromInitializer extends Initializer<LinkConfiguration, F String partitionColumnName = jobConf.fromJobConfig.partitionColumn; // If it's not specified, we can use primary key of given table (if it's table based import) if (StringUtils.isBlank(partitionColumnName) && tableImport) { - partitionColumnName = executor.getPrimaryKey(jobConf.fromJobConfig.schemaName, jobConf.fromJobConfig.tableName); + String [] primaryKeyColumns = executor.getPrimaryKey(jobConf.fromJobConfig.schemaName, jobConf.fromJobConfig.tableName); + LOG.info("Found primary key columns [" + StringUtils.join(primaryKeyColumns, ", ") + "]"); + if(primaryKeyColumns == null) { + throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0025, "Please specify partition column."); + } else if (primaryKeyColumns.length > 1) { + LOG.warn("Table have compound primary key, for partitioner we're using only first column of the key: " + primaryKeyColumns[0]); + } + + partitionColumnName = primaryKeyColumns[0]; } // If we don't have partition column name, we will error out if (partitionColumnName != null) { http://git-wip-us.apache.org/repos/asf/sqoop/blob/ec0544c6/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutorTest.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutorTest.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutorTest.java index a482ac4..59c12f3 100644 --- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutorTest.java +++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutorTest.java @@ -35,6 +35,7 @@ public class GenericJdbcExecutorTest { private final String table; private final String emptyTable; private final String schema; + private final String compoundPrimaryKeyTable; private GenericJdbcExecutor executor; private static final int START = -10; @@ -44,6 +45,7 @@ public class GenericJdbcExecutorTest { table = getClass().getSimpleName().toUpperCase(); emptyTable = table + "_EMPTY"; schema = table + "_SCHEMA"; + compoundPrimaryKeyTable = table + "_COMPOUND"; } @BeforeMethod(alwaysRun = true) @@ -53,6 +55,7 @@ public class GenericJdbcExecutorTest { executor.executeUpdate("CREATE TABLE " + executor.encloseIdentifier(emptyTable )+ "(ICOL INTEGER PRIMARY KEY, VCOL VARCHAR(20))"); executor.executeUpdate("CREATE TABLE " + executor.encloseIdentifier(table) + "(ICOL INTEGER PRIMARY KEY, VCOL VARCHAR(20))"); executor.executeUpdate("CREATE TABLE " + executor.encloseIdentifiers(schema, table) + "(ICOL INTEGER PRIMARY KEY, VCOL VARCHAR(20))"); + executor.executeUpdate("CREATE TABLE " + executor.encloseIdentifier(compoundPrimaryKeyTable) + "(ICOL INTEGER, VCOL VARCHAR(20), PRIMARY KEY(VCOL, ICOL))"); for (int i = 0; i < NUMBER_OF_ROWS; i++) { int value = START + i; @@ -90,8 +93,14 @@ public class GenericJdbcExecutorTest { assertNull(executor.getPrimaryKey("non-existing-schema", "non-existing-table")); assertNull(executor.getPrimaryKey("non-existing-catalog", "non-existing-schema", "non-existing-table")); - assertEquals(executor.getPrimaryKey(table), "ICOL"); - assertEquals(executor.getPrimaryKey(schema, table), "ICOL"); + assertEquals(executor.getPrimaryKey(schema, table), new String[] {"ICOL"}); + assertEquals(executor.getPrimaryKey(compoundPrimaryKeyTable), new String[] {"VCOL", "ICOL"}); + } + + @Test(expectedExceptions = SqoopException.class) + public void TestGetPrimaryKeySameTableInMultipleSchemas() { + // Same table name exists in two schemas and therefore we should fail here + executor.getPrimaryKey(table); } @Test http://git-wip-us.apache.org/repos/asf/sqoop/blob/ec0544c6/docs/src/site/sphinx/Connectors.rst ---------------------------------------------------------------------- diff --git a/docs/src/site/sphinx/Connectors.rst b/docs/src/site/sphinx/Connectors.rst index af54467..41571ba 100644 --- a/docs/src/site/sphinx/Connectors.rst +++ b/docs/src/site/sphinx/Connectors.rst @@ -80,7 +80,7 @@ Inputs associated with the Job configuration for the FROM direction include: | | | *Optional* Comma separated list of columns. | | +-----------------------------+---------+-------------------------------------------------------------------------+---------------------------------------------+ | Partition column name | Map | The column name used to partition the data transfer process. | col1 | -| | | *Optional*. Defaults to primary key of table. | | +| | | *Optional*. Defaults to table's first column of primary key. | | +-----------------------------+---------+-------------------------------------------------------------------------+---------------------------------------------+ | Null value allowed for | Boolean | True or false depending on whether NULL values are allowed in data | true | | the partition column | | of the Partition column. *Optional*. | |
