http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromDestroyer.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromDestroyer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromDestroyer.java new file mode 100644 index 0000000..2df193c --- /dev/null +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromDestroyer.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.jdbc; + +import org.apache.log4j.Logger; +import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration; +import org.apache.sqoop.connector.jdbc.configuration.FromJobConfiguration; +import org.apache.sqoop.job.etl.Destroyer; +import org.apache.sqoop.job.etl.DestroyerContext; + +public class GenericJdbcFromDestroyer extends Destroyer<ConnectionConfiguration, FromJobConfiguration> { + + private static final Logger LOG = + Logger.getLogger(GenericJdbcFromDestroyer.class); + + @Override + public void destroy(DestroyerContext context, ConnectionConfiguration connection, FromJobConfiguration job) { + LOG.info("Running generic JDBC connector destroyer"); + } + +}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java new file mode 100644 index 0000000..63c2609 --- /dev/null +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java @@ -0,0 +1,322 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.jdbc; + +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.util.LinkedList; +import java.util.List; + +import org.apache.commons.lang.StringUtils; +import org.apache.log4j.Logger; +import org.apache.sqoop.common.MutableContext; +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration; +import org.apache.sqoop.connector.jdbc.configuration.FromJobConfiguration; +import org.apache.sqoop.connector.jdbc.util.SqlTypesUtils; +import org.apache.sqoop.job.Constants; +import org.apache.sqoop.job.etl.Initializer; +import org.apache.sqoop.job.etl.InitializerContext; +import org.apache.sqoop.schema.Schema; +import org.apache.sqoop.schema.type.Column; +import org.apache.sqoop.utils.ClassUtils; + +public class GenericJdbcFromInitializer extends Initializer<ConnectionConfiguration, FromJobConfiguration> { + + private static final Logger LOG = + Logger.getLogger(GenericJdbcFromInitializer.class); + + private GenericJdbcExecutor executor; + + @Override + public void initialize(InitializerContext context, ConnectionConfiguration connection, FromJobConfiguration job) { + configureJdbcProperties(context.getContext(), connection, job); + try { + configurePartitionProperties(context.getContext(), connection, job); + configureTableProperties(context.getContext(), connection, job); + } finally { + executor.close(); + } + } + + @Override + public List<String> getJars(InitializerContext context, ConnectionConfiguration connection, FromJobConfiguration job) { + List<String> jars = new LinkedList<String>(); + + jars.add(ClassUtils.jarForClass(connection.connection.jdbcDriver)); + + return jars; + } + + @Override + public Schema getSchema(InitializerContext context, ConnectionConfiguration connectionConfiguration, FromJobConfiguration fromJobConfiguration) { + configureJdbcProperties(context.getContext(), connectionConfiguration, fromJobConfiguration); + + String schemaName = fromJobConfiguration.table.tableName; + if(schemaName == null) { + schemaName = "Query"; + } else if(fromJobConfiguration.table.schemaName != null) { + schemaName = fromJobConfiguration.table.schemaName + "." + schemaName; + } + + Schema schema = new Schema(schemaName); + ResultSet rs = null; + ResultSetMetaData rsmt = null; + try { + rs = executor.executeQuery( + context.getString(GenericJdbcConnectorConstants.CONNECTOR_FROM_JDBC_DATA_SQL) + .replace(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 0") + ); + + rsmt = rs.getMetaData(); + for (int i = 1 ; i <= rsmt.getColumnCount(); i++) { + Column column = SqlTypesUtils.sqlTypeToAbstractType(rsmt.getColumnType(i)); + + String columnName = rsmt.getColumnName(i); + if (columnName == null || columnName.equals("")) { + columnName = rsmt.getColumnLabel(i); + if (null == columnName) { + columnName = "Column " + i; + } + } + + column.setName(columnName); + schema.addColumn(column); + } + + return schema; + } catch (SQLException e) { + throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0016, e); + } finally { + if(rs != null) { + try { + rs.close(); + } catch (SQLException e) { + LOG.info("Ignoring exception while closing ResultSet", e); + } + } + } + } + + private void configureJdbcProperties(MutableContext context, ConnectionConfiguration connectionConfig, FromJobConfiguration jobConfig) { + String driver = connectionConfig.connection.jdbcDriver; + String url = connectionConfig.connection.connectionString; + String username = connectionConfig.connection.username; + String password = connectionConfig.connection.password; + + assert driver != null; + assert url != null; + + executor = new GenericJdbcExecutor(driver, url, username, password); + } + + private void configurePartitionProperties(MutableContext context, ConnectionConfiguration connectionConfig, FromJobConfiguration jobConfig) { + // ----- configure column name ----- + + String partitionColumnName = jobConfig.table.partitionColumn; + + if (partitionColumnName == null) { + // if column is not specified by the user, + // find the primary key of the table (when there is a table). + String tableName = jobConfig.table.tableName; + if (tableName != null) { + partitionColumnName = executor.getPrimaryKey(tableName); + } + } + + if (partitionColumnName != null) { + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, + partitionColumnName); + + } else { + throw new SqoopException( + GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0005); + } + + // ----- configure column type, min value, and max value ----- + + String minMaxQuery = jobConfig.table.boundaryQuery; + + if (minMaxQuery == null) { + StringBuilder builder = new StringBuilder(); + + String schemaName = jobConfig.table.schemaName; + String tableName = jobConfig.table.tableName; + String tableSql = jobConfig.table.sql; + + if (tableName != null && tableSql != null) { + // when both table name and table sql are specified: + throw new SqoopException( + GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0007); + + } else if (tableName != null) { + // when table name is specified: + + // For databases that support schemas (IE: postgresql). + String fullTableName = (schemaName == null) ? executor.delimitIdentifier(tableName) : executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName); + + String column = partitionColumnName; + builder.append("SELECT MIN("); + builder.append(column); + builder.append("), MAX("); + builder.append(column); + builder.append(") FROM "); + builder.append(fullTableName); + + } else if (tableSql != null) { + String column = executor.qualify( + partitionColumnName, GenericJdbcConnectorConstants.SUBQUERY_ALIAS); + builder.append("SELECT MIN("); + builder.append(column); + builder.append("), MAX("); + builder.append(column); + builder.append(") FROM "); + builder.append("("); + builder.append(tableSql.replace( + GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 1")); + builder.append(") "); + builder.append(GenericJdbcConnectorConstants.SUBQUERY_ALIAS); + + } else { + // when neither are specified: + throw new SqoopException( + GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0008); + } + + minMaxQuery = builder.toString(); + } + + + LOG.debug("Using minMaxQuery: " + minMaxQuery); + ResultSet rs = executor.executeQuery(minMaxQuery); + try { + ResultSetMetaData rsmd = rs.getMetaData(); + if (rsmd.getColumnCount() != 2) { + throw new SqoopException( + GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0006); + } + + rs.next(); + + int columnType = rsmd.getColumnType(1); + String min = rs.getString(1); + String max = rs.getString(2); + + LOG.info("Boundaries: min=" + min + ", max=" + max + ", columnType=" + columnType); + + context.setInteger(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, columnType); + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, min); + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, max); + + } catch (SQLException e) { + throw new SqoopException( + GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0006, e); + } + } + + private void configureTableProperties(MutableContext context, ConnectionConfiguration connectionConfig, FromJobConfiguration jobConfig) { + String dataSql; + String fieldNames; + + String schemaName = jobConfig.table.schemaName; + String tableName = jobConfig.table.tableName; + String tableSql = jobConfig.table.sql; + String tableColumns = jobConfig.table.columns; + + if (tableName != null && tableSql != null) { + // when both table name and table sql are specified: + throw new SqoopException( + GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0007); + + } else if (tableName != null) { + // when table name is specified: + + // For databases that support schemas (IE: postgresql). + String fullTableName = (schemaName == null) ? executor.delimitIdentifier(tableName) : executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName); + + if (tableColumns == null) { + StringBuilder builder = new StringBuilder(); + builder.append("SELECT * FROM "); + builder.append(fullTableName); + builder.append(" WHERE "); + builder.append(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN); + dataSql = builder.toString(); + + String[] queryColumns = executor.getQueryColumns(dataSql.replace( + GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 0")); + fieldNames = StringUtils.join(queryColumns, ','); + + } else { + StringBuilder builder = new StringBuilder(); + builder.append("SELECT "); + builder.append(tableColumns); + builder.append(" FROM "); + builder.append(fullTableName); + builder.append(" WHERE "); + builder.append(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN); + dataSql = builder.toString(); + + fieldNames = tableColumns; + } + } else if (tableSql != null) { + // when table sql is specified: + + assert tableSql.contains(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN); + + if (tableColumns == null) { + dataSql = tableSql; + + String[] queryColumns = executor.getQueryColumns(dataSql.replace( + GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 0")); + fieldNames = StringUtils.join(queryColumns, ','); + + } else { + String[] columns = StringUtils.split(tableColumns, ','); + StringBuilder builder = new StringBuilder(); + builder.append("SELECT "); + builder.append(executor.qualify( + columns[0], GenericJdbcConnectorConstants.SUBQUERY_ALIAS)); + for (int i = 1; i < columns.length; i++) { + builder.append(","); + builder.append(executor.qualify( + columns[i], GenericJdbcConnectorConstants.SUBQUERY_ALIAS)); + } + builder.append(" FROM "); + builder.append("("); + builder.append(tableSql); + builder.append(") "); + builder.append(GenericJdbcConnectorConstants.SUBQUERY_ALIAS); + dataSql = builder.toString(); + + fieldNames = tableColumns; + } + } else { + // when neither are specified: + throw new SqoopException( + GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0008); + } + + LOG.info("Using dataSql: " + dataSql); + LOG.info("Field names: " + fieldNames); + + context.setString(GenericJdbcConnectorConstants.CONNECTOR_FROM_JDBC_DATA_SQL, dataSql); + context.setString(Constants.JOB_ETL_FIELD_NAMES, fieldNames); + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportDestroyer.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportDestroyer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportDestroyer.java deleted file mode 100644 index 2cf07fe..0000000 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportDestroyer.java +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.connector.jdbc; - -import org.apache.log4j.Logger; -import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration; -import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration; -import org.apache.sqoop.job.etl.Destroyer; -import org.apache.sqoop.job.etl.DestroyerContext; - -public class GenericJdbcImportDestroyer extends Destroyer<ConnectionConfiguration, ImportJobConfiguration> { - - private static final Logger LOG = - Logger.getLogger(GenericJdbcImportDestroyer.class); - - @Override - public void destroy(DestroyerContext context, ConnectionConfiguration connection, ImportJobConfiguration job) { - LOG.info("Running generic JDBC connector destroyer"); - } - -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportExtractor.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportExtractor.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportExtractor.java deleted file mode 100644 index 3f9aa9b..0000000 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportExtractor.java +++ /dev/null @@ -1,78 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.connector.jdbc; - -import java.sql.ResultSet; -import java.sql.ResultSetMetaData; -import java.sql.SQLException; - -import org.apache.log4j.Logger; -import org.apache.sqoop.common.SqoopException; -import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration; -import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration; -import org.apache.sqoop.job.etl.ExtractorContext; -import org.apache.sqoop.job.etl.Extractor; - -public class GenericJdbcImportExtractor extends Extractor<ConnectionConfiguration, ImportJobConfiguration, GenericJdbcImportPartition> { - - public static final Logger LOG = Logger.getLogger(GenericJdbcImportExtractor.class); - - private long rowsRead = 0; - @Override - public void extract(ExtractorContext context, ConnectionConfiguration connection, ImportJobConfiguration job, GenericJdbcImportPartition partition) { - String driver = connection.connection.jdbcDriver; - String url = connection.connection.connectionString; - String username = connection.connection.username; - String password = connection.connection.password; - GenericJdbcExecutor executor = new GenericJdbcExecutor(driver, url, username, password); - - String query = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL); - String conditions = partition.getConditions(); - query = query.replace(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, conditions); - LOG.info("Using query: " + query); - - rowsRead = 0; - ResultSet resultSet = executor.executeQuery(query); - - try { - ResultSetMetaData metaData = resultSet.getMetaData(); - int column = metaData.getColumnCount(); - while (resultSet.next()) { - Object[] array = new Object[column]; - for (int i = 0; i< column; i++) { - array[i] = resultSet.getObject(i + 1) == null ? GenericJdbcConnectorConstants.SQL_NULL_VALUE - : resultSet.getObject(i + 1); - } - context.getDataWriter().writeArrayRecord(array); - rowsRead++; - } - } catch (SQLException e) { - throw new SqoopException( - GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0004, e); - - } finally { - executor.close(); - } - } - - @Override - public long getRowsRead() { - return rowsRead; - } - -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java deleted file mode 100644 index 2ad3cb2..0000000 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java +++ /dev/null @@ -1,322 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.connector.jdbc; - -import java.sql.ResultSet; -import java.sql.ResultSetMetaData; -import java.sql.SQLException; -import java.util.LinkedList; -import java.util.List; - -import org.apache.commons.lang.StringUtils; -import org.apache.log4j.Logger; -import org.apache.sqoop.common.MutableContext; -import org.apache.sqoop.common.SqoopException; -import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration; -import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration; -import org.apache.sqoop.connector.jdbc.util.SqlTypesUtils; -import org.apache.sqoop.job.Constants; -import org.apache.sqoop.job.etl.Initializer; -import org.apache.sqoop.job.etl.InitializerContext; -import org.apache.sqoop.schema.Schema; -import org.apache.sqoop.schema.type.Column; -import org.apache.sqoop.utils.ClassUtils; - -public class GenericJdbcImportInitializer extends Initializer<ConnectionConfiguration, ImportJobConfiguration> { - - private static final Logger LOG = - Logger.getLogger(GenericJdbcImportInitializer.class); - - private GenericJdbcExecutor executor; - - @Override - public void initialize(InitializerContext context, ConnectionConfiguration connection, ImportJobConfiguration job) { - configureJdbcProperties(context.getContext(), connection, job); - try { - configurePartitionProperties(context.getContext(), connection, job); - configureTableProperties(context.getContext(), connection, job); - } finally { - executor.close(); - } - } - - @Override - public List<String> getJars(InitializerContext context, ConnectionConfiguration connection, ImportJobConfiguration job) { - List<String> jars = new LinkedList<String>(); - - jars.add(ClassUtils.jarForClass(connection.connection.jdbcDriver)); - - return jars; - } - - @Override - public Schema getSchema(InitializerContext context, ConnectionConfiguration connectionConfiguration, ImportJobConfiguration importJobConfiguration) { - configureJdbcProperties(context.getContext(), connectionConfiguration, importJobConfiguration); - - String schemaName = importJobConfiguration.table.tableName; - if(schemaName == null) { - schemaName = "Query"; - } else if(importJobConfiguration.table.schemaName != null) { - schemaName = importJobConfiguration.table.schemaName + "." + schemaName; - } - - Schema schema = new Schema(schemaName); - ResultSet rs = null; - ResultSetMetaData rsmt = null; - try { - rs = executor.executeQuery( - context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL) - .replace(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 0") - ); - - rsmt = rs.getMetaData(); - for (int i = 1 ; i <= rsmt.getColumnCount(); i++) { - Column column = SqlTypesUtils.sqlTypeToAbstractType(rsmt.getColumnType(i)); - - String columnName = rsmt.getColumnName(i); - if (columnName == null || columnName.equals("")) { - columnName = rsmt.getColumnLabel(i); - if (null == columnName) { - columnName = "Column " + i; - } - } - - column.setName(columnName); - schema.addColumn(column); - } - - return schema; - } catch (SQLException e) { - throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0016, e); - } finally { - if(rs != null) { - try { - rs.close(); - } catch (SQLException e) { - LOG.info("Ignoring exception while closing ResultSet", e); - } - } - } - } - - private void configureJdbcProperties(MutableContext context, ConnectionConfiguration connectionConfig, ImportJobConfiguration jobConfig) { - String driver = connectionConfig.connection.jdbcDriver; - String url = connectionConfig.connection.connectionString; - String username = connectionConfig.connection.username; - String password = connectionConfig.connection.password; - - assert driver != null; - assert url != null; - - executor = new GenericJdbcExecutor(driver, url, username, password); - } - - private void configurePartitionProperties(MutableContext context, ConnectionConfiguration connectionConfig, ImportJobConfiguration jobConfig) { - // ----- configure column name ----- - - String partitionColumnName = jobConfig.table.partitionColumn; - - if (partitionColumnName == null) { - // if column is not specified by the user, - // find the primary key of the table (when there is a table). - String tableName = jobConfig.table.tableName; - if (tableName != null) { - partitionColumnName = executor.getPrimaryKey(tableName); - } - } - - if (partitionColumnName != null) { - context.setString( - GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, - partitionColumnName); - - } else { - throw new SqoopException( - GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0005); - } - - // ----- configure column type, min value, and max value ----- - - String minMaxQuery = jobConfig.table.boundaryQuery; - - if (minMaxQuery == null) { - StringBuilder builder = new StringBuilder(); - - String schemaName = jobConfig.table.schemaName; - String tableName = jobConfig.table.tableName; - String tableSql = jobConfig.table.sql; - - if (tableName != null && tableSql != null) { - // when both table name and table sql are specified: - throw new SqoopException( - GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0007); - - } else if (tableName != null) { - // when table name is specified: - - // For databases that support schemas (IE: postgresql). - String fullTableName = (schemaName == null) ? executor.delimitIdentifier(tableName) : executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName); - - String column = partitionColumnName; - builder.append("SELECT MIN("); - builder.append(column); - builder.append("), MAX("); - builder.append(column); - builder.append(") FROM "); - builder.append(fullTableName); - - } else if (tableSql != null) { - String column = executor.qualify( - partitionColumnName, GenericJdbcConnectorConstants.SUBQUERY_ALIAS); - builder.append("SELECT MIN("); - builder.append(column); - builder.append("), MAX("); - builder.append(column); - builder.append(") FROM "); - builder.append("("); - builder.append(tableSql.replace( - GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 1")); - builder.append(") "); - builder.append(GenericJdbcConnectorConstants.SUBQUERY_ALIAS); - - } else { - // when neither are specified: - throw new SqoopException( - GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0008); - } - - minMaxQuery = builder.toString(); - } - - - LOG.debug("Using minMaxQuery: " + minMaxQuery); - ResultSet rs = executor.executeQuery(minMaxQuery); - try { - ResultSetMetaData rsmd = rs.getMetaData(); - if (rsmd.getColumnCount() != 2) { - throw new SqoopException( - GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0006); - } - - rs.next(); - - int columnType = rsmd.getColumnType(1); - String min = rs.getString(1); - String max = rs.getString(2); - - LOG.info("Boundaries: min=" + min + ", max=" + max + ", columnType=" + columnType); - - context.setInteger(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, columnType); - context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, min); - context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, max); - - } catch (SQLException e) { - throw new SqoopException( - GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0006, e); - } - } - - private void configureTableProperties(MutableContext context, ConnectionConfiguration connectionConfig, ImportJobConfiguration jobConfig) { - String dataSql; - String fieldNames; - - String schemaName = jobConfig.table.schemaName; - String tableName = jobConfig.table.tableName; - String tableSql = jobConfig.table.sql; - String tableColumns = jobConfig.table.columns; - - if (tableName != null && tableSql != null) { - // when both table name and table sql are specified: - throw new SqoopException( - GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0007); - - } else if (tableName != null) { - // when table name is specified: - - // For databases that support schemas (IE: postgresql). - String fullTableName = (schemaName == null) ? executor.delimitIdentifier(tableName) : executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName); - - if (tableColumns == null) { - StringBuilder builder = new StringBuilder(); - builder.append("SELECT * FROM "); - builder.append(fullTableName); - builder.append(" WHERE "); - builder.append(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN); - dataSql = builder.toString(); - - String[] queryColumns = executor.getQueryColumns(dataSql.replace( - GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 0")); - fieldNames = StringUtils.join(queryColumns, ','); - - } else { - StringBuilder builder = new StringBuilder(); - builder.append("SELECT "); - builder.append(tableColumns); - builder.append(" FROM "); - builder.append(fullTableName); - builder.append(" WHERE "); - builder.append(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN); - dataSql = builder.toString(); - - fieldNames = tableColumns; - } - } else if (tableSql != null) { - // when table sql is specified: - - assert tableSql.contains(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN); - - if (tableColumns == null) { - dataSql = tableSql; - - String[] queryColumns = executor.getQueryColumns(dataSql.replace( - GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 0")); - fieldNames = StringUtils.join(queryColumns, ','); - - } else { - String[] columns = StringUtils.split(tableColumns, ','); - StringBuilder builder = new StringBuilder(); - builder.append("SELECT "); - builder.append(executor.qualify( - columns[0], GenericJdbcConnectorConstants.SUBQUERY_ALIAS)); - for (int i = 1; i < columns.length; i++) { - builder.append(","); - builder.append(executor.qualify( - columns[i], GenericJdbcConnectorConstants.SUBQUERY_ALIAS)); - } - builder.append(" FROM "); - builder.append("("); - builder.append(tableSql); - builder.append(") "); - builder.append(GenericJdbcConnectorConstants.SUBQUERY_ALIAS); - dataSql = builder.toString(); - - fieldNames = tableColumns; - } - } else { - // when neither are specified: - throw new SqoopException( - GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0008); - } - - LOG.info("Using dataSql: " + dataSql); - LOG.info("Field names: " + fieldNames); - - context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL, dataSql); - context.setString(Constants.JOB_ETL_FIELD_NAMES, fieldNames); - } -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartition.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartition.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartition.java deleted file mode 100644 index 66ed556..0000000 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartition.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.connector.jdbc; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -import org.apache.sqoop.job.etl.Partition; - -public class GenericJdbcImportPartition extends Partition { - - private String conditions; - - public void setConditions(String conditions) { - this.conditions = conditions; - } - - public String getConditions() { - return conditions; - } - - @Override - public void readFields(DataInput in) throws IOException { - conditions = in.readUTF(); - } - - @Override - public void write(DataOutput out) throws IOException { - out.writeUTF(conditions); - } - - @Override - public String toString() { - return conditions; - } - -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java deleted file mode 100644 index d103223..0000000 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java +++ /dev/null @@ -1,605 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.connector.jdbc; - -import java.math.BigDecimal; -import java.sql.Date; -import java.sql.Time; -import java.sql.Timestamp; -import java.sql.Types; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.LinkedList; -import java.util.List; -import java.util.TimeZone; - -import org.apache.sqoop.common.SqoopException; -import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration; -import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration; -import org.apache.sqoop.job.etl.Partition; -import org.apache.sqoop.job.etl.Partitioner; -import org.apache.sqoop.job.etl.PartitionerContext; - -public class GenericJdbcImportPartitioner extends Partitioner<ConnectionConfiguration, ImportJobConfiguration> { - - private static final BigDecimal NUMERIC_MIN_INCREMENT = new BigDecimal(10000 * Double.MIN_VALUE); - - - private long numberPartitions; - private String partitionColumnName; - private int partitionColumnType; - private String partitionMinValue; - private String partitionMaxValue; - private Boolean partitionColumnNull; - - @Override - public List<Partition> getPartitions(PartitionerContext context,ConnectionConfiguration connection, ImportJobConfiguration job) { - List<Partition> partitions = new LinkedList<Partition>(); - - numberPartitions = context.getMaxPartitions(); - partitionColumnName = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME); - partitionColumnType = context.getInt(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, -1); - partitionMinValue = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE); - partitionMaxValue = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE); - - partitionColumnNull = job.table.partitionColumnNull; - if (partitionColumnNull == null) { - partitionColumnNull = false; - } - - if (partitionMinValue == null && partitionMaxValue == null) { - GenericJdbcImportPartition partition = new GenericJdbcImportPartition(); - partition.setConditions(partitionColumnName + " IS NULL"); - partitions.add(partition); - return partitions; - } - - if (partitionColumnNull) { - GenericJdbcImportPartition partition = new GenericJdbcImportPartition(); - partition.setConditions(partitionColumnName + " IS NULL"); - partitions.add(partition); - numberPartitions -= 1; - } - - switch (partitionColumnType) { - case Types.TINYINT: - case Types.SMALLINT: - case Types.INTEGER: - case Types.BIGINT: - // Integer column - partitions.addAll(partitionIntegerColumn()); - break; - - case Types.REAL: - case Types.FLOAT: - case Types.DOUBLE: - // Floating point column - partitions.addAll(partitionFloatingPointColumn()); - break; - - case Types.NUMERIC: - case Types.DECIMAL: - // Decimal column - partitions.addAll(partitionNumericColumn()); - break; - - case Types.BIT: - case Types.BOOLEAN: - // Boolean column - return partitionBooleanColumn(); - - case Types.DATE: - case Types.TIME: - case Types.TIMESTAMP: - // Date time column - partitions.addAll(partitionDateTimeColumn()); - break; - - case Types.CHAR: - case Types.VARCHAR: - case Types.LONGVARCHAR: - // Text column - partitions.addAll(partitionTextColumn()); - break; - - default: - throw new SqoopException( - GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0011, - String.valueOf(partitionColumnType)); - } - - return partitions; - } - - protected List<Partition> partitionDateTimeColumn() { - List<Partition> partitions = new LinkedList<Partition>(); - - long minDateValue = 0; - long maxDateValue = 0; - SimpleDateFormat sdf = null; - switch(partitionColumnType) { - case Types.DATE: - sdf = new SimpleDateFormat("yyyy-MM-dd"); - minDateValue = Date.valueOf(partitionMinValue).getTime(); - maxDateValue = Date.valueOf(partitionMaxValue).getTime(); - break; - case Types.TIME: - sdf = new SimpleDateFormat("HH:mm:ss"); - minDateValue = Time.valueOf(partitionMinValue).getTime(); - maxDateValue = Time.valueOf(partitionMaxValue).getTime(); - break; - case Types.TIMESTAMP: - sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); - minDateValue = Timestamp.valueOf(partitionMinValue).getTime(); - maxDateValue = Timestamp.valueOf(partitionMaxValue).getTime(); - break; - } - - - minDateValue += TimeZone.getDefault().getOffset(minDateValue); - maxDateValue += TimeZone.getDefault().getOffset(maxDateValue); - - sdf.setTimeZone(TimeZone.getTimeZone("GMT")); - - long interval = (maxDateValue - minDateValue) / numberPartitions; - long remainder = (maxDateValue - minDateValue) % numberPartitions; - - if (interval == 0) { - numberPartitions = (int)remainder; - } - - long lowerBound; - long upperBound = minDateValue; - - Object objLB = null; - Object objUB = null; - - for (int i = 1; i < numberPartitions; i++) { - lowerBound = upperBound; - upperBound = lowerBound + interval; - upperBound += (i <= remainder) ? 1 : 0; - - switch(partitionColumnType) { - case Types.DATE: - objLB = new Date(lowerBound); - objUB = new Date(upperBound); - break; - case Types.TIME: - objLB = new Time(lowerBound); - objUB = new Time(upperBound); - - break; - case Types.TIMESTAMP: - objLB = new Timestamp(lowerBound); - objUB = new Timestamp(upperBound); - break; - } - - GenericJdbcImportPartition partition = new GenericJdbcImportPartition(); - partition.setConditions( - constructDateConditions(sdf, objLB, objUB, false)); - partitions.add(partition); - } - - switch(partitionColumnType) { - case Types.DATE: - objLB = new Date(upperBound); - objUB = new Date(maxDateValue); - break; - case Types.TIME: - objLB = new Time(upperBound); - objUB = new Time(maxDateValue); - break; - case Types.TIMESTAMP: - objLB = new Timestamp(upperBound); - objUB = new Timestamp(maxDateValue); - break; - } - - - GenericJdbcImportPartition partition = new GenericJdbcImportPartition(); - partition.setConditions( - constructDateConditions(sdf, objLB, objUB, true)); - partitions.add(partition); - return partitions; - } - - protected List<Partition> partitionTextColumn() { - List<Partition> partitions = new LinkedList<Partition>(); - - String minStringValue = null; - String maxStringValue = null; - - // Remove common prefix if any as it does not affect outcome. - int maxPrefixLen = Math.min(partitionMinValue.length(), - partitionMaxValue.length()); - // Calculate common prefix length - int cpLen = 0; - - for (cpLen = 0; cpLen < maxPrefixLen; cpLen++) { - char c1 = partitionMinValue.charAt(cpLen); - char c2 = partitionMaxValue.charAt(cpLen); - if (c1 != c2) { - break; - } - } - - // The common prefix has length 'sharedLen'. Extract it from both. - String prefix = partitionMinValue.substring(0, cpLen); - minStringValue = partitionMinValue.substring(cpLen); - maxStringValue = partitionMaxValue.substring(cpLen); - - BigDecimal minStringBD = textToBigDecimal(minStringValue); - BigDecimal maxStringBD = textToBigDecimal(maxStringValue); - - // Having one single value means that we can create only one single split - if(minStringBD.equals(maxStringBD)) { - GenericJdbcImportPartition partition = new GenericJdbcImportPartition(); - partition.setConditions(constructTextConditions(prefix, 0, 0, - partitionMinValue, partitionMaxValue, true, true)); - partitions.add(partition); - return partitions; - } - - // Get all the split points together. - List<BigDecimal> splitPoints = new LinkedList<BigDecimal>(); - - BigDecimal splitSize = divide(maxStringBD.subtract(minStringBD), - new BigDecimal(numberPartitions)); - if (splitSize.compareTo(NUMERIC_MIN_INCREMENT) < 0) { - splitSize = NUMERIC_MIN_INCREMENT; - } - - BigDecimal curVal = minStringBD; - - int parts = 0; - - while (curVal.compareTo(maxStringBD) <= 0 && parts < numberPartitions) { - splitPoints.add(curVal); - curVal = curVal.add(splitSize); - // bigDecimalToText approximates to next comparison location. - // Make sure we are still in range - String text = bigDecimalToText(curVal); - curVal = textToBigDecimal(text); - ++parts; - } - - if (splitPoints.size() == 0 - || splitPoints.get(0).compareTo(minStringBD) != 0) { - splitPoints.add(0, minStringBD); - } - - if (splitPoints.get(splitPoints.size() - 1).compareTo(maxStringBD) != 0 - || splitPoints.size() == 1) { - splitPoints.add(maxStringBD); - } - - // Turn the split points into a set of string intervals. - BigDecimal start = splitPoints.get(0); - for (int i = 1; i < splitPoints.size(); i++) { - BigDecimal end = splitPoints.get(i); - - GenericJdbcImportPartition partition = new GenericJdbcImportPartition(); - partition.setConditions(constructTextConditions(prefix, start, end, - partitionMinValue, partitionMaxValue, i == 1, i == splitPoints.size() - 1)); - partitions.add(partition); - - start = end; - } - - return partitions; - } - - - protected List<Partition> partitionIntegerColumn() { - List<Partition> partitions = new LinkedList<Partition>(); - - long minValue = partitionMinValue == null ? Long.MIN_VALUE - : Long.parseLong(partitionMinValue); - long maxValue = Long.parseLong(partitionMaxValue); - - long interval = (maxValue - minValue) / numberPartitions; - long remainder = (maxValue - minValue) % numberPartitions; - - if (interval == 0) { - numberPartitions = (int)remainder; - } - - long lowerBound; - long upperBound = minValue; - for (int i = 1; i < numberPartitions; i++) { - lowerBound = upperBound; - upperBound = lowerBound + interval; - upperBound += (i <= remainder) ? 1 : 0; - - GenericJdbcImportPartition partition = new GenericJdbcImportPartition(); - partition.setConditions( - constructConditions(lowerBound, upperBound, false)); - partitions.add(partition); - } - - GenericJdbcImportPartition partition = new GenericJdbcImportPartition(); - partition.setConditions( - constructConditions(upperBound, maxValue, true)); - partitions.add(partition); - - return partitions; - } - - protected List<Partition> partitionFloatingPointColumn() { - List<Partition> partitions = new LinkedList<Partition>(); - - - double minValue = partitionMinValue == null ? Double.MIN_VALUE - : Double.parseDouble(partitionMinValue); - double maxValue = Double.parseDouble(partitionMaxValue); - - double interval = (maxValue - minValue) / numberPartitions; - - double lowerBound; - double upperBound = minValue; - for (int i = 1; i < numberPartitions; i++) { - lowerBound = upperBound; - upperBound = lowerBound + interval; - - GenericJdbcImportPartition partition = new GenericJdbcImportPartition(); - partition.setConditions( - constructConditions(lowerBound, upperBound, false)); - partitions.add(partition); - } - - GenericJdbcImportPartition partition = new GenericJdbcImportPartition(); - partition.setConditions( - constructConditions(upperBound, maxValue, true)); - partitions.add(partition); - - return partitions; - } - - protected List<Partition> partitionNumericColumn() { - List<Partition> partitions = new LinkedList<Partition>(); - // Having one end in null is not supported - if (partitionMinValue == null || partitionMaxValue == null) { - throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0015); - } - - BigDecimal minValue = new BigDecimal(partitionMinValue); - BigDecimal maxValue = new BigDecimal(partitionMaxValue); - - // Having one single value means that we can create only one single split - if(minValue.equals(maxValue)) { - GenericJdbcImportPartition partition = new GenericJdbcImportPartition(); - partition.setConditions(constructConditions(minValue)); - partitions.add(partition); - return partitions; - } - - // Get all the split points together. - List<BigDecimal> splitPoints = new LinkedList<BigDecimal>(); - - BigDecimal splitSize = divide(maxValue.subtract(minValue), new BigDecimal(numberPartitions)); - - if (splitSize.compareTo(NUMERIC_MIN_INCREMENT) < 0) { - splitSize = NUMERIC_MIN_INCREMENT; - } - - BigDecimal curVal = minValue; - - while (curVal.compareTo(maxValue) <= 0) { - splitPoints.add(curVal); - curVal = curVal.add(splitSize); - } - - if (splitPoints.get(splitPoints.size() - 1).compareTo(maxValue) != 0 || splitPoints.size() == 1) { - splitPoints.remove(splitPoints.size() - 1); - splitPoints.add(maxValue); - } - - // Turn the split points into a set of intervals. - BigDecimal start = splitPoints.get(0); - for (int i = 1; i < splitPoints.size(); i++) { - BigDecimal end = splitPoints.get(i); - - GenericJdbcImportPartition partition = new GenericJdbcImportPartition(); - partition.setConditions(constructConditions(start, end, i == splitPoints.size() - 1)); - partitions.add(partition); - - start = end; - } - - return partitions; - } - - protected List<Partition> partitionBooleanColumn() { - List<Partition> partitions = new LinkedList<Partition>(); - - - Boolean minValue = parseBooleanValue(partitionMinValue); - Boolean maxValue = parseBooleanValue(partitionMaxValue); - - StringBuilder conditions = new StringBuilder(); - - // Having one single value means that we can create only one single split - if(minValue.equals(maxValue)) { - GenericJdbcImportPartition partition = new GenericJdbcImportPartition(); - - conditions.append(partitionColumnName).append(" = ") - .append(maxValue); - partition.setConditions(conditions.toString()); - partitions.add(partition); - return partitions; - } - - GenericJdbcImportPartition partition = new GenericJdbcImportPartition(); - - if (partitionMinValue == null) { - conditions = new StringBuilder(); - conditions.append(partitionColumnName).append(" IS NULL"); - partition.setConditions(conditions.toString()); - partitions.add(partition); - } - partition = new GenericJdbcImportPartition(); - conditions = new StringBuilder(); - conditions.append(partitionColumnName).append(" = TRUE"); - partition.setConditions(conditions.toString()); - partitions.add(partition); - partition = new GenericJdbcImportPartition(); - conditions = new StringBuilder(); - conditions.append(partitionColumnName).append(" = FALSE"); - partition.setConditions(conditions.toString()); - partitions.add(partition); - return partitions; - } - - private Boolean parseBooleanValue(String value) { - if (value == null) { - return null; - } - if (value.equals("1")) { - return Boolean.TRUE; - } else if (value.equals("0")) { - return Boolean.FALSE; - } else { - return Boolean.parseBoolean(value); - } - } - - protected BigDecimal divide(BigDecimal numerator, BigDecimal denominator) { - try { - return numerator.divide(denominator); - } catch (ArithmeticException ae) { - return numerator.divide(denominator, BigDecimal.ROUND_HALF_UP); - } - } - - protected String constructConditions( - Object lowerBound, Object upperBound, boolean lastOne) { - StringBuilder conditions = new StringBuilder(); - conditions.append(lowerBound); - conditions.append(" <= "); - conditions.append(partitionColumnName); - conditions.append(" AND "); - conditions.append(partitionColumnName); - conditions.append(lastOne ? " <= " : " < "); - conditions.append(upperBound); - return conditions.toString(); - } - - protected String constructConditions(Object value) { - return new StringBuilder() - .append(partitionColumnName) - .append(" = ") - .append(value) - .toString() - ; - } - - protected String constructDateConditions(SimpleDateFormat sdf, - Object lowerBound, Object upperBound, boolean lastOne) { - StringBuilder conditions = new StringBuilder(); - conditions.append('\'').append(sdf.format((java.util.Date)lowerBound)).append('\''); - conditions.append(" <= "); - conditions.append(partitionColumnName); - conditions.append(" AND "); - conditions.append(partitionColumnName); - conditions.append(lastOne ? " <= " : " < "); - conditions.append('\'').append(sdf.format((java.util.Date)upperBound)).append('\''); - return conditions.toString(); - } - - protected String constructTextConditions(String prefix, Object lowerBound, Object upperBound, - String lowerStringBound, String upperStringBound, boolean firstOne, boolean lastOne) { - StringBuilder conditions = new StringBuilder(); - String lbString = prefix + bigDecimalToText((BigDecimal)lowerBound); - String ubString = prefix + bigDecimalToText((BigDecimal)upperBound); - conditions.append('\'').append(firstOne ? lowerStringBound : lbString).append('\''); - conditions.append(" <= "); - conditions.append(partitionColumnName); - conditions.append(" AND "); - conditions.append(partitionColumnName); - conditions.append(lastOne ? " <= " : " < "); - conditions.append('\'').append(lastOne ? upperStringBound : ubString).append('\''); - return conditions.toString(); - } - - /** - * Converts a string to a BigDecimal representation in Base 2^21 format. - * The maximum Unicode code point value defined is 10FFFF. Although - * not all database system support UTF16 and mostly we expect UCS2 - * characters only, for completeness, we assume that all the unicode - * characters are supported. - * Given a string 's' containing characters s_0, s_1,..s_n, - * the string is interpreted as the number: 0.s_0 s_1 s_2 s_3 s_48) - * This can be split and each split point can be converted back to - * a string value for comparison purposes. The number of characters - * is restricted to prevent repeating fractions and rounding errors - * towards the higher fraction positions. - */ - private static final BigDecimal UNITS_BASE = new BigDecimal(0x200000); - private static final int MAX_CHARS_TO_CONVERT = 4; - - private BigDecimal textToBigDecimal(String str) { - BigDecimal result = BigDecimal.ZERO; - BigDecimal divisor = UNITS_BASE; - - int len = Math.min(str.length(), MAX_CHARS_TO_CONVERT); - - for (int n = 0; n < len; ) { - int codePoint = str.codePointAt(n); - n += Character.charCount(codePoint); - BigDecimal val = divide(new BigDecimal(codePoint), divisor); - result = result.add(val); - divisor = divisor.multiply(UNITS_BASE); - } - - return result; - } - - private String bigDecimalToText(BigDecimal bd) { - BigDecimal curVal = bd.stripTrailingZeros(); - StringBuilder sb = new StringBuilder(); - - for (int n = 0; n < MAX_CHARS_TO_CONVERT; ++n) { - curVal = curVal.multiply(UNITS_BASE); - int cp = curVal.intValue(); - if (0 >= cp) { - break; - } - - if (!Character.isDefined(cp)) { - int t_cp = Character.MAX_CODE_POINT < cp ? 1 : cp; - // We are guaranteed to find at least one character - while(!Character.isDefined(t_cp)) { - ++t_cp; - if (t_cp == cp) { - break; - } - if (t_cp >= Character.MAX_CODE_POINT || t_cp <= 0) { - t_cp = 1; - } - } - cp = t_cp; - } - curVal = curVal.subtract(new BigDecimal(cp)); - sb.append(Character.toChars(cp)); - } - - return sb.toString(); - } - -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcLoader.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcLoader.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcLoader.java new file mode 100644 index 0000000..7d583c5 --- /dev/null +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcLoader.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.jdbc; + +import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration; +import org.apache.sqoop.connector.jdbc.configuration.ToJobConfiguration; +import org.apache.sqoop.job.etl.Loader; +import org.apache.sqoop.job.etl.LoaderContext; + +public class GenericJdbcLoader extends Loader<ConnectionConfiguration, ToJobConfiguration> { + + public static final int DEFAULT_ROWS_PER_BATCH = 100; + public static final int DEFAULT_BATCHES_PER_TRANSACTION = 100; + private int rowsPerBatch = DEFAULT_ROWS_PER_BATCH; + private int batchesPerTransaction = DEFAULT_BATCHES_PER_TRANSACTION; + + @Override + public void load(LoaderContext context, ConnectionConfiguration connection, ToJobConfiguration job) throws Exception{ + String driver = connection.connection.jdbcDriver; + String url = connection.connection.connectionString; + String username = connection.connection.username; + String password = connection.connection.password; + GenericJdbcExecutor executor = new GenericJdbcExecutor(driver, url, username, password); + executor.setAutoCommit(false); + + String sql = context.getString(GenericJdbcConnectorConstants.CONNECTOR_TO_JDBC_DATA_SQL); + executor.beginBatch(sql); + try { + int numberOfRows = 0; + int numberOfBatches = 0; + Object[] array; + + while ((array = context.getDataReader().readArrayRecord()) != null) { + numberOfRows++; + executor.addBatch(array); + + if (numberOfRows == rowsPerBatch) { + numberOfBatches++; + if (numberOfBatches == batchesPerTransaction) { + executor.executeBatch(true); + numberOfBatches = 0; + } else { + executor.executeBatch(false); + } + numberOfRows = 0; + } + } + + if (numberOfRows != 0 || numberOfBatches != 0) { + // execute and commit the remaining rows + executor.executeBatch(true); + } + + executor.endBatch(); + + } finally { + executor.close(); + } + } + +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcPartition.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcPartition.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcPartition.java new file mode 100644 index 0000000..65400ef --- /dev/null +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcPartition.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.jdbc; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.sqoop.job.etl.Partition; + +public class GenericJdbcPartition extends Partition { + + private String conditions; + + public void setConditions(String conditions) { + this.conditions = conditions; + } + + public String getConditions() { + return conditions; + } + + @Override + public void readFields(DataInput in) throws IOException { + conditions = in.readUTF(); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeUTF(conditions); + } + + @Override + public String toString() { + return conditions; + } + +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcPartitioner.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcPartitioner.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcPartitioner.java new file mode 100644 index 0000000..bf84445 --- /dev/null +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcPartitioner.java @@ -0,0 +1,604 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.jdbc; + +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.sql.Types; +import java.text.SimpleDateFormat; +import java.util.LinkedList; +import java.util.List; +import java.util.TimeZone; + +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration; +import org.apache.sqoop.connector.jdbc.configuration.FromJobConfiguration; +import org.apache.sqoop.job.etl.Partition; +import org.apache.sqoop.job.etl.Partitioner; +import org.apache.sqoop.job.etl.PartitionerContext; + +public class GenericJdbcPartitioner extends Partitioner<ConnectionConfiguration, FromJobConfiguration> { + + private static final BigDecimal NUMERIC_MIN_INCREMENT = new BigDecimal(10000 * Double.MIN_VALUE); + + + private long numberPartitions; + private String partitionColumnName; + private int partitionColumnType; + private String partitionMinValue; + private String partitionMaxValue; + private Boolean partitionColumnNull; + + @Override + public List<Partition> getPartitions(PartitionerContext context,ConnectionConfiguration connection, FromJobConfiguration job) { + List<Partition> partitions = new LinkedList<Partition>(); + + numberPartitions = context.getMaxPartitions(); + partitionColumnName = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME); + partitionColumnType = context.getInt(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, -1); + partitionMinValue = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE); + partitionMaxValue = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE); + + partitionColumnNull = job.table.partitionColumnNull; + if (partitionColumnNull == null) { + partitionColumnNull = false; + } + + if (partitionMinValue == null && partitionMaxValue == null) { + GenericJdbcPartition partition = new GenericJdbcPartition(); + partition.setConditions(partitionColumnName + " IS NULL"); + partitions.add(partition); + return partitions; + } + + if (partitionColumnNull) { + GenericJdbcPartition partition = new GenericJdbcPartition(); + partition.setConditions(partitionColumnName + " IS NULL"); + partitions.add(partition); + numberPartitions -= 1; + } + + switch (partitionColumnType) { + case Types.TINYINT: + case Types.SMALLINT: + case Types.INTEGER: + case Types.BIGINT: + // Integer column + partitions.addAll(partitionIntegerColumn()); + break; + + case Types.REAL: + case Types.FLOAT: + case Types.DOUBLE: + // Floating point column + partitions.addAll(partitionFloatingPointColumn()); + break; + + case Types.NUMERIC: + case Types.DECIMAL: + // Decimal column + partitions.addAll(partitionNumericColumn()); + break; + + case Types.BIT: + case Types.BOOLEAN: + // Boolean column + return partitionBooleanColumn(); + + case Types.DATE: + case Types.TIME: + case Types.TIMESTAMP: + // Date time column + partitions.addAll(partitionDateTimeColumn()); + break; + + case Types.CHAR: + case Types.VARCHAR: + case Types.LONGVARCHAR: + // Text column + partitions.addAll(partitionTextColumn()); + break; + + default: + throw new SqoopException( + GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0011, + String.valueOf(partitionColumnType)); + } + + return partitions; + } + + protected List<Partition> partitionDateTimeColumn() { + List<Partition> partitions = new LinkedList<Partition>(); + + long minDateValue = 0; + long maxDateValue = 0; + SimpleDateFormat sdf = null; + switch(partitionColumnType) { + case Types.DATE: + sdf = new SimpleDateFormat("yyyy-MM-dd"); + minDateValue = Date.valueOf(partitionMinValue).getTime(); + maxDateValue = Date.valueOf(partitionMaxValue).getTime(); + break; + case Types.TIME: + sdf = new SimpleDateFormat("HH:mm:ss"); + minDateValue = Time.valueOf(partitionMinValue).getTime(); + maxDateValue = Time.valueOf(partitionMaxValue).getTime(); + break; + case Types.TIMESTAMP: + sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); + minDateValue = Timestamp.valueOf(partitionMinValue).getTime(); + maxDateValue = Timestamp.valueOf(partitionMaxValue).getTime(); + break; + } + + + minDateValue += TimeZone.getDefault().getOffset(minDateValue); + maxDateValue += TimeZone.getDefault().getOffset(maxDateValue); + + sdf.setTimeZone(TimeZone.getTimeZone("GMT")); + + long interval = (maxDateValue - minDateValue) / numberPartitions; + long remainder = (maxDateValue - minDateValue) % numberPartitions; + + if (interval == 0) { + numberPartitions = (int)remainder; + } + + long lowerBound; + long upperBound = minDateValue; + + Object objLB = null; + Object objUB = null; + + for (int i = 1; i < numberPartitions; i++) { + lowerBound = upperBound; + upperBound = lowerBound + interval; + upperBound += (i <= remainder) ? 1 : 0; + + switch(partitionColumnType) { + case Types.DATE: + objLB = new Date(lowerBound); + objUB = new Date(upperBound); + break; + case Types.TIME: + objLB = new Time(lowerBound); + objUB = new Time(upperBound); + + break; + case Types.TIMESTAMP: + objLB = new Timestamp(lowerBound); + objUB = new Timestamp(upperBound); + break; + } + + GenericJdbcPartition partition = new GenericJdbcPartition(); + partition.setConditions( + constructDateConditions(sdf, objLB, objUB, false)); + partitions.add(partition); + } + + switch(partitionColumnType) { + case Types.DATE: + objLB = new Date(upperBound); + objUB = new Date(maxDateValue); + break; + case Types.TIME: + objLB = new Time(upperBound); + objUB = new Time(maxDateValue); + break; + case Types.TIMESTAMP: + objLB = new Timestamp(upperBound); + objUB = new Timestamp(maxDateValue); + break; + } + + + GenericJdbcPartition partition = new GenericJdbcPartition(); + partition.setConditions( + constructDateConditions(sdf, objLB, objUB, true)); + partitions.add(partition); + return partitions; + } + + protected List<Partition> partitionTextColumn() { + List<Partition> partitions = new LinkedList<Partition>(); + + String minStringValue = null; + String maxStringValue = null; + + // Remove common prefix if any as it does not affect outcome. + int maxPrefixLen = Math.min(partitionMinValue.length(), + partitionMaxValue.length()); + // Calculate common prefix length + int cpLen = 0; + + for (cpLen = 0; cpLen < maxPrefixLen; cpLen++) { + char c1 = partitionMinValue.charAt(cpLen); + char c2 = partitionMaxValue.charAt(cpLen); + if (c1 != c2) { + break; + } + } + + // The common prefix has length 'sharedLen'. Extract it from both. + String prefix = partitionMinValue.substring(0, cpLen); + minStringValue = partitionMinValue.substring(cpLen); + maxStringValue = partitionMaxValue.substring(cpLen); + + BigDecimal minStringBD = textToBigDecimal(minStringValue); + BigDecimal maxStringBD = textToBigDecimal(maxStringValue); + + // Having one single value means that we can create only one single split + if(minStringBD.equals(maxStringBD)) { + GenericJdbcPartition partition = new GenericJdbcPartition(); + partition.setConditions(constructTextConditions(prefix, 0, 0, + partitionMinValue, partitionMaxValue, true, true)); + partitions.add(partition); + return partitions; + } + + // Get all the split points together. + List<BigDecimal> splitPoints = new LinkedList<BigDecimal>(); + + BigDecimal splitSize = divide(maxStringBD.subtract(minStringBD), + new BigDecimal(numberPartitions)); + if (splitSize.compareTo(NUMERIC_MIN_INCREMENT) < 0) { + splitSize = NUMERIC_MIN_INCREMENT; + } + + BigDecimal curVal = minStringBD; + + int parts = 0; + + while (curVal.compareTo(maxStringBD) <= 0 && parts < numberPartitions) { + splitPoints.add(curVal); + curVal = curVal.add(splitSize); + // bigDecimalToText approximates to next comparison location. + // Make sure we are still in range + String text = bigDecimalToText(curVal); + curVal = textToBigDecimal(text); + ++parts; + } + + if (splitPoints.size() == 0 + || splitPoints.get(0).compareTo(minStringBD) != 0) { + splitPoints.add(0, minStringBD); + } + + if (splitPoints.get(splitPoints.size() - 1).compareTo(maxStringBD) != 0 + || splitPoints.size() == 1) { + splitPoints.add(maxStringBD); + } + + // Turn the split points into a set of string intervals. + BigDecimal start = splitPoints.get(0); + for (int i = 1; i < splitPoints.size(); i++) { + BigDecimal end = splitPoints.get(i); + + GenericJdbcPartition partition = new GenericJdbcPartition(); + partition.setConditions(constructTextConditions(prefix, start, end, + partitionMinValue, partitionMaxValue, i == 1, i == splitPoints.size() - 1)); + partitions.add(partition); + + start = end; + } + + return partitions; + } + + + protected List<Partition> partitionIntegerColumn() { + List<Partition> partitions = new LinkedList<Partition>(); + + long minValue = partitionMinValue == null ? Long.MIN_VALUE + : Long.parseLong(partitionMinValue); + long maxValue = Long.parseLong(partitionMaxValue); + + long interval = (maxValue - minValue) / numberPartitions; + long remainder = (maxValue - minValue) % numberPartitions; + + if (interval == 0) { + numberPartitions = (int)remainder; + } + + long lowerBound; + long upperBound = minValue; + for (int i = 1; i < numberPartitions; i++) { + lowerBound = upperBound; + upperBound = lowerBound + interval; + upperBound += (i <= remainder) ? 1 : 0; + + GenericJdbcPartition partition = new GenericJdbcPartition(); + partition.setConditions( + constructConditions(lowerBound, upperBound, false)); + partitions.add(partition); + } + + GenericJdbcPartition partition = new GenericJdbcPartition(); + partition.setConditions( + constructConditions(upperBound, maxValue, true)); + partitions.add(partition); + + return partitions; + } + + protected List<Partition> partitionFloatingPointColumn() { + List<Partition> partitions = new LinkedList<Partition>(); + + + double minValue = partitionMinValue == null ? Double.MIN_VALUE + : Double.parseDouble(partitionMinValue); + double maxValue = Double.parseDouble(partitionMaxValue); + + double interval = (maxValue - minValue) / numberPartitions; + + double lowerBound; + double upperBound = minValue; + for (int i = 1; i < numberPartitions; i++) { + lowerBound = upperBound; + upperBound = lowerBound + interval; + + GenericJdbcPartition partition = new GenericJdbcPartition(); + partition.setConditions( + constructConditions(lowerBound, upperBound, false)); + partitions.add(partition); + } + + GenericJdbcPartition partition = new GenericJdbcPartition(); + partition.setConditions( + constructConditions(upperBound, maxValue, true)); + partitions.add(partition); + + return partitions; + } + + protected List<Partition> partitionNumericColumn() { + List<Partition> partitions = new LinkedList<Partition>(); + // Having one end in null is not supported + if (partitionMinValue == null || partitionMaxValue == null) { + throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0015); + } + + BigDecimal minValue = new BigDecimal(partitionMinValue); + BigDecimal maxValue = new BigDecimal(partitionMaxValue); + + // Having one single value means that we can create only one single split + if(minValue.equals(maxValue)) { + GenericJdbcPartition partition = new GenericJdbcPartition(); + partition.setConditions(constructConditions(minValue)); + partitions.add(partition); + return partitions; + } + + // Get all the split points together. + List<BigDecimal> splitPoints = new LinkedList<BigDecimal>(); + + BigDecimal splitSize = divide(maxValue.subtract(minValue), new BigDecimal(numberPartitions)); + + if (splitSize.compareTo(NUMERIC_MIN_INCREMENT) < 0) { + splitSize = NUMERIC_MIN_INCREMENT; + } + + BigDecimal curVal = minValue; + + while (curVal.compareTo(maxValue) <= 0) { + splitPoints.add(curVal); + curVal = curVal.add(splitSize); + } + + if (splitPoints.get(splitPoints.size() - 1).compareTo(maxValue) != 0 || splitPoints.size() == 1) { + splitPoints.remove(splitPoints.size() - 1); + splitPoints.add(maxValue); + } + + // Turn the split points into a set of intervals. + BigDecimal start = splitPoints.get(0); + for (int i = 1; i < splitPoints.size(); i++) { + BigDecimal end = splitPoints.get(i); + + GenericJdbcPartition partition = new GenericJdbcPartition(); + partition.setConditions(constructConditions(start, end, i == splitPoints.size() - 1)); + partitions.add(partition); + + start = end; + } + + return partitions; + } + + protected List<Partition> partitionBooleanColumn() { + List<Partition> partitions = new LinkedList<Partition>(); + + + Boolean minValue = parseBooleanValue(partitionMinValue); + Boolean maxValue = parseBooleanValue(partitionMaxValue); + + StringBuilder conditions = new StringBuilder(); + + // Having one single value means that we can create only one single split + if(minValue.equals(maxValue)) { + GenericJdbcPartition partition = new GenericJdbcPartition(); + + conditions.append(partitionColumnName).append(" = ") + .append(maxValue); + partition.setConditions(conditions.toString()); + partitions.add(partition); + return partitions; + } + + GenericJdbcPartition partition = new GenericJdbcPartition(); + + if (partitionMinValue == null) { + conditions = new StringBuilder(); + conditions.append(partitionColumnName).append(" IS NULL"); + partition.setConditions(conditions.toString()); + partitions.add(partition); + } + partition = new GenericJdbcPartition(); + conditions = new StringBuilder(); + conditions.append(partitionColumnName).append(" = TRUE"); + partition.setConditions(conditions.toString()); + partitions.add(partition); + partition = new GenericJdbcPartition(); + conditions = new StringBuilder(); + conditions.append(partitionColumnName).append(" = FALSE"); + partition.setConditions(conditions.toString()); + partitions.add(partition); + return partitions; + } + + private Boolean parseBooleanValue(String value) { + if (value == null) { + return null; + } + if (value.equals("1")) { + return Boolean.TRUE; + } else if (value.equals("0")) { + return Boolean.FALSE; + } else { + return Boolean.parseBoolean(value); + } + } + + protected BigDecimal divide(BigDecimal numerator, BigDecimal denominator) { + try { + return numerator.divide(denominator); + } catch (ArithmeticException ae) { + return numerator.divide(denominator, BigDecimal.ROUND_HALF_UP); + } + } + + protected String constructConditions( + Object lowerBound, Object upperBound, boolean lastOne) { + StringBuilder conditions = new StringBuilder(); + conditions.append(lowerBound); + conditions.append(" <= "); + conditions.append(partitionColumnName); + conditions.append(" AND "); + conditions.append(partitionColumnName); + conditions.append(lastOne ? " <= " : " < "); + conditions.append(upperBound); + return conditions.toString(); + } + + protected String constructConditions(Object value) { + return new StringBuilder() + .append(partitionColumnName) + .append(" = ") + .append(value) + .toString() + ; + } + + protected String constructDateConditions(SimpleDateFormat sdf, + Object lowerBound, Object upperBound, boolean lastOne) { + StringBuilder conditions = new StringBuilder(); + conditions.append('\'').append(sdf.format((java.util.Date)lowerBound)).append('\''); + conditions.append(" <= "); + conditions.append(partitionColumnName); + conditions.append(" AND "); + conditions.append(partitionColumnName); + conditions.append(lastOne ? " <= " : " < "); + conditions.append('\'').append(sdf.format((java.util.Date)upperBound)).append('\''); + return conditions.toString(); + } + + protected String constructTextConditions(String prefix, Object lowerBound, Object upperBound, + String lowerStringBound, String upperStringBound, boolean firstOne, boolean lastOne) { + StringBuilder conditions = new StringBuilder(); + String lbString = prefix + bigDecimalToText((BigDecimal)lowerBound); + String ubString = prefix + bigDecimalToText((BigDecimal)upperBound); + conditions.append('\'').append(firstOne ? lowerStringBound : lbString).append('\''); + conditions.append(" <= "); + conditions.append(partitionColumnName); + conditions.append(" AND "); + conditions.append(partitionColumnName); + conditions.append(lastOne ? " <= " : " < "); + conditions.append('\'').append(lastOne ? upperStringBound : ubString).append('\''); + return conditions.toString(); + } + + /** + * Converts a string to a BigDecimal representation in Base 2^21 format. + * The maximum Unicode code point value defined is 10FFFF. Although + * not all database system support UTF16 and mostly we expect UCS2 + * characters only, for completeness, we assume that all the unicode + * characters are supported. + * Given a string 's' containing characters s_0, s_1,..s_n, + * the string is interpreted as the number: 0.s_0 s_1 s_2 s_3 s_48) + * This can be split and each split point can be converted back to + * a string value for comparison purposes. The number of characters + * is restricted to prevent repeating fractions and rounding errors + * towards the higher fraction positions. + */ + private static final BigDecimal UNITS_BASE = new BigDecimal(0x200000); + private static final int MAX_CHARS_TO_CONVERT = 4; + + private BigDecimal textToBigDecimal(String str) { + BigDecimal result = BigDecimal.ZERO; + BigDecimal divisor = UNITS_BASE; + + int len = Math.min(str.length(), MAX_CHARS_TO_CONVERT); + + for (int n = 0; n < len; ) { + int codePoint = str.codePointAt(n); + n += Character.charCount(codePoint); + BigDecimal val = divide(new BigDecimal(codePoint), divisor); + result = result.add(val); + divisor = divisor.multiply(UNITS_BASE); + } + + return result; + } + + private String bigDecimalToText(BigDecimal bd) { + BigDecimal curVal = bd.stripTrailingZeros(); + StringBuilder sb = new StringBuilder(); + + for (int n = 0; n < MAX_CHARS_TO_CONVERT; ++n) { + curVal = curVal.multiply(UNITS_BASE); + int cp = curVal.intValue(); + if (0 >= cp) { + break; + } + + if (!Character.isDefined(cp)) { + int t_cp = Character.MAX_CODE_POINT < cp ? 1 : cp; + // We are guaranteed to find at least one character + while(!Character.isDefined(t_cp)) { + ++t_cp; + if (t_cp == cp) { + break; + } + if (t_cp >= Character.MAX_CODE_POINT || t_cp <= 0) { + t_cp = 1; + } + } + cp = t_cp; + } + curVal = curVal.subtract(new BigDecimal(cp)); + sb.append(Character.toChars(cp)); + } + + return sb.toString(); + } + +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcToDestroyer.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcToDestroyer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcToDestroyer.java new file mode 100644 index 0000000..6be3e12 --- /dev/null +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcToDestroyer.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.jdbc; + +import org.apache.log4j.Logger; +import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration; +import org.apache.sqoop.connector.jdbc.configuration.ToJobConfiguration; +import org.apache.sqoop.job.etl.Destroyer; +import org.apache.sqoop.job.etl.DestroyerContext; + +public class GenericJdbcToDestroyer extends Destroyer<ConnectionConfiguration, ToJobConfiguration> { + + private static final Logger LOG = Logger.getLogger(GenericJdbcToDestroyer.class); + + @Override + public void destroy(DestroyerContext context, ConnectionConfiguration connection, ToJobConfiguration job) { + LOG.info("Running generic JDBC connector destroyer"); + + final String tableName = job.table.tableName; + final String stageTableName = job.table.stageTableName; + final boolean stageEnabled = stageTableName != null && + stageTableName.length() > 0; + if(stageEnabled) { + moveDataToDestinationTable(connection, + context.isSuccess(), stageTableName, tableName); + } + } + + private void moveDataToDestinationTable(ConnectionConfiguration connectorConf, + boolean success, String stageTableName, String tableName) { + GenericJdbcExecutor executor = + new GenericJdbcExecutor(connectorConf.connection.jdbcDriver, + connectorConf.connection.connectionString, + connectorConf.connection.username, + connectorConf.connection.password); + + if(success) { + LOG.info("Job completed, transferring data from stage table to " + + "destination table."); + executor.migrateData(stageTableName, tableName); + } else { + LOG.warn("Job failed, clearing stage table."); + executor.deleteTableData(stageTableName); + } + } + +}
