Updated Branches: refs/heads/sqoop2 344c6309c -> aa8e1e779
http://git-wip-us.apache.org/repos/asf/sqoop/blob/aa8e1e77/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java index 3e9789c..96818ba 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java @@ -29,9 +29,12 @@ import org.apache.sqoop.common.MutableContext; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration; import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration; +import org.apache.sqoop.connector.jdbc.util.SqlTypesUtils; import org.apache.sqoop.job.Constants; import org.apache.sqoop.job.etl.Initializer; import org.apache.sqoop.job.etl.InitializerContext; +import org.apache.sqoop.schema.Schema; +import org.apache.sqoop.schema.type.Column; import org.apache.sqoop.utils.ClassUtils; public class GenericJdbcImportInitializer extends Initializer<ConnectionConfiguration, ImportJobConfiguration> { @@ -61,6 +64,55 @@ public class GenericJdbcImportInitializer extends Initializer<ConnectionConfigur return jars; } + @Override + public Schema getSchema(InitializerContext context, ConnectionConfiguration connectionConfiguration, ImportJobConfiguration importJobConfiguration) { + configureJdbcProperties(context.getContext(), connectionConfiguration, importJobConfiguration); + + String schemaName = importJobConfiguration.table.tableName; + if(schemaName == null) { + schemaName = "Query"; + } + + Schema schema = new Schema(schemaName); + + ResultSet rs = null; + ResultSetMetaData rsmt = null; + try { + rs = executor.executeQuery( + context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL) + .replace(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 0") + ); + + rsmt = rs.getMetaData(); + for (int i = 1 ; i <= rsmt.getColumnCount(); i++) { + Column column = SqlTypesUtils.sqlTypeToAbstractType(rsmt.getColumnType(i)); + + String columnName = rsmt.getColumnName(i); + if (columnName == null || columnName.equals("")) { + columnName = rsmt.getColumnLabel(i); + if (null == columnName) { + columnName = "Column " + i; + } + } + + column.setName(columnName); + schema.addColumn(column); + } + + return schema; + } catch (SQLException e) { + throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0016, e); + } finally { + if(rs != null) { + try { + rs.close(); + } catch (SQLException e) { + LOG.info("Ignoring exception while closing ResultSet", e); + } + } + } + } + private void configureJdbcProperties(MutableContext context, ConnectionConfiguration connectionConfig, ImportJobConfiguration jobConfig) { String driver = connectionConfig.connection.jdbcDriver; String url = connectionConfig.connection.connectionString; http://git-wip-us.apache.org/repos/asf/sqoop/blob/aa8e1e77/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/util/SqlTypesUtils.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/util/SqlTypesUtils.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/util/SqlTypesUtils.java new file mode 100644 index 0000000..c18f165 --- /dev/null +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/util/SqlTypesUtils.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.jdbc.util; + +import org.apache.sqoop.schema.type.Column; +import org.apache.sqoop.schema.type.Binary; +import org.apache.sqoop.schema.type.Bit; +import org.apache.sqoop.schema.type.Date; +import org.apache.sqoop.schema.type.DateTime; +import org.apache.sqoop.schema.type.Decimal; +import org.apache.sqoop.schema.type.FixedPoint; +import org.apache.sqoop.schema.type.FloatingPoint; +import org.apache.sqoop.schema.type.Text; +import org.apache.sqoop.schema.type.Time; +import org.apache.sqoop.schema.type.Unsupported; + +import java.sql.Types; + +/** + * Utility class to work with SQL types. + */ +public class SqlTypesUtils { + + /** + * Convert given java.sql.Types number into internal data type. + * + * @param sqlType java.sql.Types constant + * @return Concrete Column implementation + */ + public static Column sqlTypeToAbstractType(int sqlType) { + switch (sqlType) { + case Types.SMALLINT: + case Types.TINYINT: + case Types.INTEGER: + return new FixedPoint(); + + case Types.VARCHAR: + case Types.CHAR: + case Types.LONGVARCHAR: + case Types.NVARCHAR: + case Types.NCHAR: + case Types.LONGNVARCHAR: + return new Text(); + + case Types.DATE: + return new Date(); + + case Types.TIME: + return new Time(); + + case Types.TIMESTAMP: + return new DateTime(); + + case Types.CLOB: + case Types.FLOAT: + case Types.REAL: + case Types.DOUBLE: + return new FloatingPoint(); + + case Types.NUMERIC: + case Types.DECIMAL: + case Types.BIGINT: + return new Decimal(); + + case Types.BIT: + case Types.BOOLEAN: + return new Bit(); + + case Types.BINARY: + case Types.VARBINARY: + case Types.BLOB: + case Types.LONGVARBINARY: + return new Binary(); + + default: + return new Unsupported((long)sqlType); + } + } + + private SqlTypesUtils() { + // Instantiation is prohibited + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/aa8e1e77/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java index 9f4269a..a33fa36 100644 --- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java +++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java @@ -28,6 +28,10 @@ import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration; import org.apache.sqoop.job.Constants; import org.apache.sqoop.job.etl.Initializer; import org.apache.sqoop.job.etl.InitializerContext; +import org.apache.sqoop.schema.Schema; +import org.apache.sqoop.schema.type.FixedPoint; +import org.apache.sqoop.schema.type.FloatingPoint; +import org.apache.sqoop.schema.type.Text; public class TestImportInitializer extends TestCase { @@ -87,6 +91,20 @@ public class TestImportInitializer extends TestCase { } } + /** + * Return Schema representation for the testing table. + * + * @param name Name that should be used for the generated schema. + * @return + */ + public Schema getSchema(String name) { + return new Schema(name) + .addColumn(new FixedPoint("ICOL")) + .addColumn(new FloatingPoint("DCOL")) + .addColumn(new Text("VCOL")) + ; + } + @Override public void tearDown() { executor.close(); @@ -290,6 +308,49 @@ public class TestImportInitializer extends TestCase { String.valueOf((double)(START+NUMBER_OF_ROWS-1))); } + + @SuppressWarnings("unchecked") + public void testGetSchemaForTable() throws Exception { + ConnectionConfiguration connConf = new ConnectionConfiguration(); + ImportJobConfiguration jobConf = new ImportJobConfiguration(); + + connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; + connConf.connection.connectionString = GenericJdbcTestConstants.URL; + jobConf.table.schemaName = schemaName; + jobConf.table.tableName = tableName; + jobConf.table.partitionColumn = "DCOL"; + + MutableContext context = new MutableMapContext(); + InitializerContext initializerContext = new InitializerContext(context); + + @SuppressWarnings("rawtypes") + Initializer initializer = new GenericJdbcImportInitializer(); + initializer.initialize(initializerContext, connConf, jobConf); + Schema schema = initializer.getSchema(initializerContext, connConf, jobConf); + assertEquals(getSchema(tableName), schema); + } + + @SuppressWarnings("unchecked") + public void testGetSchemaForSql() throws Exception { + ConnectionConfiguration connConf = new ConnectionConfiguration(); + ImportJobConfiguration jobConf = new ImportJobConfiguration(); + + connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; + connConf.connection.connectionString = GenericJdbcTestConstants.URL; + jobConf.table.schemaName = schemaName; + jobConf.table.sql = tableSql; + jobConf.table.partitionColumn = "DCOL"; + + MutableContext context = new MutableMapContext(); + InitializerContext initializerContext = new InitializerContext(context); + + @SuppressWarnings("rawtypes") + Initializer initializer = new GenericJdbcImportInitializer(); + initializer.initialize(initializerContext, connConf, jobConf); + Schema schema = initializer.getSchema(initializerContext, connConf, jobConf); + assertEquals(getSchema("Query"), schema); + } + @SuppressWarnings("unchecked") public void testTableSqlWithTableColumnsWithSchema() throws Exception { ConnectionConfiguration connConf = new ConnectionConfiguration(); http://git-wip-us.apache.org/repos/asf/sqoop/blob/aa8e1e77/core/src/main/java/org/apache/sqoop/framework/JobManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/framework/JobManager.java b/core/src/main/java/org/apache/sqoop/framework/JobManager.java index 5a2f490..58d6c10 100644 --- a/core/src/main/java/org/apache/sqoop/framework/JobManager.java +++ b/core/src/main/java/org/apache/sqoop/framework/JobManager.java @@ -355,6 +355,13 @@ public class JobManager implements Reconfigurable { request.getConfigConnectorConnection(), request.getConfigConnectorJob())); + // Retrieve and persist the schema + request.getSummary().setConnectorSchema(initializer.getSchema( + initializerContext, + request.getConfigConnectorConnection(), + request.getConfigConnectorJob() + )); + // Bootstrap job from framework perspective switch (job.getType()) { case IMPORT: http://git-wip-us.apache.org/repos/asf/sqoop/blob/aa8e1e77/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java ---------------------------------------------------------------------- diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java b/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java index 346b84c..88744ea 100644 --- a/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java +++ b/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java @@ -17,6 +17,8 @@ */ package org.apache.sqoop.job.etl; +import org.apache.sqoop.schema.Schema; + import java.util.LinkedList; import java.util.List; @@ -52,4 +54,8 @@ public abstract class Initializer<ConnectionConfiguration, JobConfiguration> { return new LinkedList<String>(); } + public abstract Schema getSchema(InitializerContext context, + ConnectionConfiguration connectionConfiguration, + JobConfiguration jobConfiguration); + }
