Repository: sqoop Updated Branches: refs/heads/SQOOP-1367 5c29a2a29 -> bfb0f2069
http://git-wip-us.apache.org/repos/asf/sqoop/blob/bfb0f206/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java deleted file mode 100644 index cd834e8..0000000 --- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java +++ /dev/null @@ -1,404 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.connector.jdbc; - -import java.sql.Types; - -import junit.framework.TestCase; - -import org.apache.sqoop.common.MutableContext; -import org.apache.sqoop.common.MutableMapContext; -import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration; -//import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration; -import org.apache.sqoop.job.Constants; -import org.apache.sqoop.job.etl.Initializer; -import org.apache.sqoop.job.etl.InitializerContext; -import org.apache.sqoop.schema.Schema; -import org.apache.sqoop.schema.type.FixedPoint; -import org.apache.sqoop.schema.type.FloatingPoint; -import org.apache.sqoop.schema.type.Text; - -public class TestImportInitializer extends TestCase { - -// private final String schemaName; -// private final String tableName; -// private final String schemalessTableName; -// private final String tableSql; -// private final String schemalessTableSql; -// private final String tableColumns; -// -// private GenericJdbcExecutor executor; -// -// private static final int START = -50; -// private static final int NUMBER_OF_ROWS = 101; -// -// public TestImportInitializer() { -// schemaName = getClass().getSimpleName().toUpperCase() + "SCHEMA"; -// tableName = getClass().getSimpleName().toUpperCase() + "TABLEWITHSCHEMA"; -// schemalessTableName = getClass().getSimpleName().toUpperCase() + "TABLE"; -// tableSql = "SELECT * FROM " + schemaName + "." + tableName + " WHERE ${CONDITIONS}"; -// schemalessTableSql = "SELECT * FROM " + schemalessTableName + " WHERE ${CONDITIONS}"; -// tableColumns = "ICOL,VCOL"; -// } -// -// @Override -// public void setUp() { -// executor = new GenericJdbcExecutor(GenericJdbcTestConstants.DRIVER, -// GenericJdbcTestConstants.URL, null, null); -// -// String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName); -// if (!executor.existTable(tableName)) { -// executor.executeUpdate("CREATE SCHEMA " + executor.delimitIdentifier(schemaName)); -// executor.executeUpdate("CREATE TABLE " -// + fullTableName -// + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))"); -// -// for (int i = 0; i < NUMBER_OF_ROWS; i++) { -// int value = START + i; -// String sql = "INSERT INTO " + fullTableName -// + " VALUES(" + value + ", " + value + ", '" + value + "')"; -// executor.executeUpdate(sql); -// } -// } -// -// fullTableName = executor.delimitIdentifier(schemalessTableName); -// if (!executor.existTable(schemalessTableName)) { -// executor.executeUpdate("CREATE TABLE " -// + fullTableName -// + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))"); -// -// for (int i = 0; i < NUMBER_OF_ROWS; i++) { -// int value = START + i; -// String sql = "INSERT INTO " + fullTableName -// + " VALUES(" + value + ", " + value + ", '" + value + "')"; -// executor.executeUpdate(sql); -// } -// } -// } -// -// /** -// * Return Schema representation for the testing fromTable. -// * -// * @param name Name that should be used for the generated schema. -// * @return -// */ -// public Schema getSchema(String name) { -// return new Schema(name) -// .addColumn(new FixedPoint("ICOL")) -// .addColumn(new FloatingPoint("DCOL")) -// .addColumn(new Text("VCOL")) -// ; -// } -// -// @Override -// public void tearDown() { -// executor.close(); -// } -// -// @SuppressWarnings("unchecked") -// public void testTableName() throws Exception { -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ImportJobConfiguration jobConf = new ImportJobConfiguration(); -// -// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; -// connConf.connection.connectionString = GenericJdbcTestConstants.URL; -// jobConf.fromTable.tableName = schemalessTableName; -// -// MutableContext context = new MutableMapContext(); -// InitializerContext initializerContext = new InitializerContext(context); -// -// @SuppressWarnings("rawtypes") -// Initializer initializer = new GenericJdbcImportInitializer(); -// initializer.initialize(initializerContext, connConf, jobConf); -// -// verifyResult(context, -// "SELECT * FROM " + executor.delimitIdentifier(schemalessTableName) -// + " WHERE ${CONDITIONS}", -// "ICOL,DCOL,VCOL", -// "ICOL", -// String.valueOf(Types.INTEGER), -// String.valueOf(START), -// String.valueOf(START+NUMBER_OF_ROWS-1)); -// } -// -// @SuppressWarnings("unchecked") -// public void testTableNameWithTableColumns() throws Exception { -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ImportJobConfiguration jobConf = new ImportJobConfiguration(); -// -// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; -// connConf.connection.connectionString = GenericJdbcTestConstants.URL; -// jobConf.fromTable.tableName = schemalessTableName; -// jobConf.fromTable.columns = tableColumns; -// -// MutableContext context = new MutableMapContext(); -// InitializerContext initializerContext = new InitializerContext(context); -// -// @SuppressWarnings("rawtypes") -// Initializer initializer = new GenericJdbcImportInitializer(); -// initializer.initialize(initializerContext, connConf, jobConf); -// -// verifyResult(context, -// "SELECT ICOL,VCOL FROM " + executor.delimitIdentifier(schemalessTableName) -// + " WHERE ${CONDITIONS}", -// tableColumns, -// "ICOL", -// String.valueOf(Types.INTEGER), -// String.valueOf(START), -// String.valueOf(START+NUMBER_OF_ROWS-1)); -// } -// -// @SuppressWarnings("unchecked") -// public void testTableSql() throws Exception { -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ImportJobConfiguration jobConf = new ImportJobConfiguration(); -// -// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; -// connConf.connection.connectionString = GenericJdbcTestConstants.URL; -// jobConf.fromTable.sql = schemalessTableSql; -// jobConf.fromTable.partitionColumn = "DCOL"; -// -// MutableContext context = new MutableMapContext(); -// InitializerContext initializerContext = new InitializerContext(context); -// -// @SuppressWarnings("rawtypes") -// Initializer initializer = new GenericJdbcImportInitializer(); -// initializer.initialize(initializerContext, connConf, jobConf); -// -// verifyResult(context, -// "SELECT * FROM " + executor.delimitIdentifier(schemalessTableName) -// + " WHERE ${CONDITIONS}", -// "ICOL,DCOL,VCOL", -// "DCOL", -// String.valueOf(Types.DOUBLE), -// String.valueOf((double)START), -// String.valueOf((double)(START+NUMBER_OF_ROWS-1))); -// } -// -// @SuppressWarnings("unchecked") -// public void testTableSqlWithTableColumns() throws Exception { -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ImportJobConfiguration jobConf = new ImportJobConfiguration(); -// -// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; -// connConf.connection.connectionString = GenericJdbcTestConstants.URL; -// jobConf.fromTable.sql = schemalessTableSql; -// jobConf.fromTable.columns = tableColumns; -// jobConf.fromTable.partitionColumn = "DCOL"; -// -// MutableContext context = new MutableMapContext(); -// InitializerContext initializerContext = new InitializerContext(context); -// -// @SuppressWarnings("rawtypes") -// Initializer initializer = new GenericJdbcImportInitializer(); -// initializer.initialize(initializerContext, connConf, jobConf); -// -// verifyResult(context, -// "SELECT SQOOP_SUBQUERY_ALIAS.ICOL,SQOOP_SUBQUERY_ALIAS.VCOL FROM " -// + "(SELECT * FROM " + executor.delimitIdentifier(schemalessTableName) -// + " WHERE ${CONDITIONS}) SQOOP_SUBQUERY_ALIAS", -// tableColumns, -// "DCOL", -// String.valueOf(Types.DOUBLE), -// String.valueOf((double)START), -// String.valueOf((double)(START+NUMBER_OF_ROWS-1))); -// } -// -// @SuppressWarnings("unchecked") -// public void testTableNameWithSchema() throws Exception { -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ImportJobConfiguration jobConf = new ImportJobConfiguration(); -// -// String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName); -// -// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; -// connConf.connection.connectionString = GenericJdbcTestConstants.URL; -// jobConf.fromTable.schemaName = schemaName; -// jobConf.fromTable.tableName = tableName; -// -// MutableContext context = new MutableMapContext(); -// InitializerContext initializerContext = new InitializerContext(context); -// -// @SuppressWarnings("rawtypes") -// Initializer initializer = new GenericJdbcImportInitializer(); -// initializer.initialize(initializerContext, connConf, jobConf); -// -// verifyResult(context, -// "SELECT * FROM " + fullTableName -// + " WHERE ${CONDITIONS}", -// "ICOL,DCOL,VCOL", -// "ICOL", -// String.valueOf(Types.INTEGER), -// String.valueOf(START), -// String.valueOf(START+NUMBER_OF_ROWS-1)); -// } -// -// @SuppressWarnings("unchecked") -// public void testTableNameWithTableColumnsWithSchema() throws Exception { -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ImportJobConfiguration jobConf = new ImportJobConfiguration(); -// -// String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName); -// -// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; -// connConf.connection.connectionString = GenericJdbcTestConstants.URL; -// jobConf.fromTable.schemaName = schemaName; -// jobConf.fromTable.tableName = tableName; -// jobConf.fromTable.columns = tableColumns; -// -// MutableContext context = new MutableMapContext(); -// InitializerContext initializerContext = new InitializerContext(context); -// -// @SuppressWarnings("rawtypes") -// Initializer initializer = new GenericJdbcImportInitializer(); -// initializer.initialize(initializerContext, connConf, jobConf); -// -// verifyResult(context, -// "SELECT ICOL,VCOL FROM " + fullTableName -// + " WHERE ${CONDITIONS}", -// tableColumns, -// "ICOL", -// String.valueOf(Types.INTEGER), -// String.valueOf(START), -// String.valueOf(START+NUMBER_OF_ROWS-1)); -// } -// -// @SuppressWarnings("unchecked") -// public void testTableSqlWithSchema() throws Exception { -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ImportJobConfiguration jobConf = new ImportJobConfiguration(); -// -// String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName); -// -// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; -// connConf.connection.connectionString = GenericJdbcTestConstants.URL; -// jobConf.fromTable.schemaName = schemaName; -// jobConf.fromTable.sql = tableSql; -// jobConf.fromTable.partitionColumn = "DCOL"; -// -// MutableContext context = new MutableMapContext(); -// InitializerContext initializerContext = new InitializerContext(context); -// -// @SuppressWarnings("rawtypes") -// Initializer initializer = new GenericJdbcImportInitializer(); -// initializer.initialize(initializerContext, connConf, jobConf); -// -// verifyResult(context, -// "SELECT * FROM " + fullTableName -// + " WHERE ${CONDITIONS}", -// "ICOL,DCOL,VCOL", -// "DCOL", -// String.valueOf(Types.DOUBLE), -// String.valueOf((double)START), -// String.valueOf((double)(START+NUMBER_OF_ROWS-1))); -// } -// -// -// @SuppressWarnings("unchecked") -// public void testGetSchemaForTable() throws Exception { -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ImportJobConfiguration jobConf = new ImportJobConfiguration(); -// -// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; -// connConf.connection.connectionString = GenericJdbcTestConstants.URL; -// jobConf.fromTable.schemaName = schemaName; -// jobConf.fromTable.tableName = tableName; -// jobConf.fromTable.partitionColumn = "DCOL"; -// -// MutableContext context = new MutableMapContext(); -// InitializerContext initializerContext = new InitializerContext(context); -// -// @SuppressWarnings("rawtypes") -// Initializer initializer = new GenericJdbcImportInitializer(); -// initializer.initialize(initializerContext, connConf, jobConf); -// Schema schema = initializer.getSchema(initializerContext, connConf, jobConf); -// assertEquals(getSchema(jobConf.table.schemaName + "." + tableName), schema); -// } -// -// @SuppressWarnings("unchecked") -// public void testGetSchemaForSql() throws Exception { -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ImportJobConfiguration jobConf = new ImportJobConfiguration(); -// -// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; -// connConf.connection.connectionString = GenericJdbcTestConstants.URL; -// jobConf.fromTable.schemaName = schemaName; -// jobConf.fromTable.sql = tableSql; -// jobConf.fromTable.partitionColumn = "DCOL"; -// -// MutableContext context = new MutableMapContext(); -// InitializerContext initializerContext = new InitializerContext(context); -// -// @SuppressWarnings("rawtypes") -// Initializer initializer = new GenericJdbcImportInitializer(); -// initializer.initialize(initializerContext, connConf, jobConf); -// Schema schema = initializer.getSchema(initializerContext, connConf, jobConf); -// assertEquals(getSchema("Query"), schema); -// } -// -// @SuppressWarnings("unchecked") -// public void testTableSqlWithTableColumnsWithSchema() throws Exception { -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ImportJobConfiguration jobConf = new ImportJobConfiguration(); -// -// String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName); -// -// connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; -// connConf.connection.connectionString = GenericJdbcTestConstants.URL; -// jobConf.fromTable.schemaName = schemaName; -// jobConf.fromTable.sql = tableSql; -// jobConf.fromTable.columns = tableColumns; -// jobConf.fromTable.partitionColumn = "DCOL"; -// -// MutableContext context = new MutableMapContext(); -// InitializerContext initializerContext = new InitializerContext(context); -// -// @SuppressWarnings("rawtypes") -// Initializer initializer = new GenericJdbcImportInitializer(); -// initializer.initialize(initializerContext, connConf, jobConf); -// -// verifyResult(context, -// "SELECT SQOOP_SUBQUERY_ALIAS.ICOL,SQOOP_SUBQUERY_ALIAS.VCOL FROM " -// + "(SELECT * FROM " + fullTableName -// + " WHERE ${CONDITIONS}) SQOOP_SUBQUERY_ALIAS", -// tableColumns, -// "DCOL", -// String.valueOf(Types.DOUBLE), -// String.valueOf((double)START), -// String.valueOf((double)(START+NUMBER_OF_ROWS-1))); -// } -// -// private void verifyResult(MutableContext context, -// String dataSql, String fieldNames, -// String partitionColumnName, String partitionColumnType, -// String partitionMinValue, String partitionMaxValue) { -// assertEquals(dataSql, context.getString( -// GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL)); -// assertEquals(fieldNames, context.getString( -// Constants.JOB_ETL_FIELD_NAMES)); -// -// assertEquals(partitionColumnName, context.getString( -// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME)); -// assertEquals(partitionColumnType, context.getString( -// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE)); -// assertEquals(partitionMinValue, context.getString( -// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE)); -// assertEquals(partitionMaxValue, context.getString( -// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE)); -// } -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/bfb0f206/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java deleted file mode 100644 index 958f75f..0000000 --- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java +++ /dev/null @@ -1,505 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.connector.jdbc; - -import java.math.BigDecimal; -import java.sql.Date; -import java.sql.Time; -import java.sql.Timestamp; -import java.sql.Types; -import java.util.Iterator; -import java.util.List; - -import junit.framework.TestCase; - -import org.apache.sqoop.common.MutableContext; -import org.apache.sqoop.common.MutableMapContext; -import org.apache.sqoop.common.SqoopException; -import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration; -//import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration; -import org.apache.sqoop.job.Constants; -import org.apache.sqoop.job.etl.Partition; -import org.apache.sqoop.job.etl.Partitioner; -import org.apache.sqoop.job.etl.PartitionerContext; - -public class TestImportPartitioner extends TestCase { - -// private static final int START = -5; -// private static final int NUMBER_OF_ROWS = 11; -// -// public void testIntegerEvenPartition() throws Exception { -// MutableContext context = new MutableMapContext(); -// context.setString( -// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, -// "ICOL"); -// context.setString( -// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, -// String.valueOf(Types.INTEGER)); -// context.setString( -// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, -// String.valueOf(START)); -// context.setString( -// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, -// String.valueOf(START + NUMBER_OF_ROWS - 1)); -// -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ImportJobConfiguration jobConf = new ImportJobConfiguration(); -// -// Partitioner partitioner = new GenericJdbcImportPartitioner(); -// PartitionerContext partitionerContext = new PartitionerContext(context, 5, null); -// List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); -// -// verifyResult(partitions, new String[] { -// "-5 <= ICOL AND ICOL < -3", -// "-3 <= ICOL AND ICOL < -1", -// "-1 <= ICOL AND ICOL < 1", -// "1 <= ICOL AND ICOL < 3", -// "3 <= ICOL AND ICOL <= 5" -// }); -// } -// -// public void testIntegerUnevenPartition() throws Exception { -// MutableContext context = new MutableMapContext(); -// context.setString( -// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, -// "ICOL"); -// context.setString( -// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, -// String.valueOf(Types.INTEGER)); -// context.setString( -// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, -// String.valueOf(START)); -// context.setString( -// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, -// String.valueOf(START + NUMBER_OF_ROWS - 1)); -// -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ImportJobConfiguration jobConf = new ImportJobConfiguration(); -// -// Partitioner partitioner = new GenericJdbcImportPartitioner(); -// PartitionerContext partitionerContext = new PartitionerContext(context, 3, null); -// List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); -// -// verifyResult(partitions, new String[] { -// "-5 <= ICOL AND ICOL < -1", -// "-1 <= ICOL AND ICOL < 2", -// "2 <= ICOL AND ICOL <= 5" -// }); -// } -// -// public void testIntegerOverPartition() throws Exception { -// MutableContext context = new MutableMapContext(); -// context.setString( -// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, -// "ICOL"); -// context.setString( -// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, -// String.valueOf(Types.INTEGER)); -// context.setString( -// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, -// String.valueOf(START)); -// context.setString( -// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, -// String.valueOf(START + NUMBER_OF_ROWS - 1)); -// -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ImportJobConfiguration jobConf = new ImportJobConfiguration(); -// -// Partitioner partitioner = new GenericJdbcImportPartitioner(); -// PartitionerContext partitionerContext = new PartitionerContext(context, 13, null); -// List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); -// -// verifyResult(partitions, new String[] { -// "-5 <= ICOL AND ICOL < -4", -// "-4 <= ICOL AND ICOL < -3", -// "-3 <= ICOL AND ICOL < -2", -// "-2 <= ICOL AND ICOL < -1", -// "-1 <= ICOL AND ICOL < 0", -// "0 <= ICOL AND ICOL < 1", -// "1 <= ICOL AND ICOL < 2", -// "2 <= ICOL AND ICOL < 3", -// "3 <= ICOL AND ICOL < 4", -// "4 <= ICOL AND ICOL <= 5" -// }); -// } -// -// public void testFloatingPointEvenPartition() throws Exception { -// MutableContext context = new MutableMapContext(); -// context.setString( -// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, -// "DCOL"); -// context.setString( -// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, -// String.valueOf(Types.DOUBLE)); -// context.setString( -// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, -// String.valueOf((double)START)); -// context.setString( -// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, -// String.valueOf((double)(START + NUMBER_OF_ROWS - 1))); -// -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ImportJobConfiguration jobConf = new ImportJobConfiguration(); -// -// Partitioner partitioner = new GenericJdbcImportPartitioner(); -// PartitionerContext partitionerContext = new PartitionerContext(context, 5, null); -// List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); -// -// verifyResult(partitions, new String[] { -// "-5.0 <= DCOL AND DCOL < -3.0", -// "-3.0 <= DCOL AND DCOL < -1.0", -// "-1.0 <= DCOL AND DCOL < 1.0", -// "1.0 <= DCOL AND DCOL < 3.0", -// "3.0 <= DCOL AND DCOL <= 5.0" -// }); -// } -// -// public void testFloatingPointUnevenPartition() throws Exception { -// MutableContext context = new MutableMapContext(); -// context.setString( -// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, -// "DCOL"); -// context.setString( -// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, -// String.valueOf(Types.DOUBLE)); -// context.setString( -// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, -// String.valueOf((double)START)); -// context.setString( -// GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, -// String.valueOf((double)(START + NUMBER_OF_ROWS - 1))); -// -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ImportJobConfiguration jobConf = new ImportJobConfiguration(); -// -// Partitioner partitioner = new GenericJdbcImportPartitioner(); -// PartitionerContext partitionerContext = new PartitionerContext(context, 3, null); -// List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); -// -// verifyResult(partitions, new String[] { -// "-5.0 <= DCOL AND DCOL < -1.6666666666666665", -// "-1.6666666666666665 <= DCOL AND DCOL < 1.666666666666667", -// "1.666666666666667 <= DCOL AND DCOL <= 5.0" -// }); -// } -// -// public void testNumericEvenPartition() throws Exception { -// MutableContext context = new MutableMapContext(); -// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, "ICOL"); -// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.NUMERIC)); -// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, String.valueOf(START)); -// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, String.valueOf(START + NUMBER_OF_ROWS - 1)); -// -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ImportJobConfiguration jobConf = new ImportJobConfiguration(); -// -// Partitioner partitioner = new GenericJdbcImportPartitioner(); -// PartitionerContext partitionerContext = new PartitionerContext(context, 5, null); -// List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); -// -// verifyResult(partitions, new String[] { -// "-5 <= ICOL AND ICOL < -3", -// "-3 <= ICOL AND ICOL < -1", -// "-1 <= ICOL AND ICOL < 1", -// "1 <= ICOL AND ICOL < 3", -// "3 <= ICOL AND ICOL <= 5" -// }); -// } -// -// public void testNumericUnevenPartition() throws Exception { -// MutableContext context = new MutableMapContext(); -// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, "DCOL"); -// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.NUMERIC)); -// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, String.valueOf(new BigDecimal(START))); -// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, String.valueOf(new BigDecimal(START + NUMBER_OF_ROWS - 1))); -// -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ImportJobConfiguration jobConf = new ImportJobConfiguration(); -// -// Partitioner partitioner = new GenericJdbcImportPartitioner(); -// PartitionerContext partitionerContext = new PartitionerContext(context, 3, null); -// List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); -// -// verifyResult(partitions, new String[]{ -// "-5 <= DCOL AND DCOL < -2", -// "-2 <= DCOL AND DCOL < 1", -// "1 <= DCOL AND DCOL <= 5" -// }); -// } -// -// public void testNumericSinglePartition() throws Exception { -// MutableContext context = new MutableMapContext(); -// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, "DCOL"); -// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.NUMERIC)); -// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, String.valueOf(new BigDecimal(START))); -// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, String.valueOf(new BigDecimal(START))); -// -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ImportJobConfiguration jobConf = new ImportJobConfiguration(); -// -// Partitioner partitioner = new GenericJdbcImportPartitioner(); -// PartitionerContext partitionerContext = new PartitionerContext(context, 3, null); -// List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); -// -// verifyResult(partitions, new String[]{ -// "DCOL = -5", -// }); -// } -// -// -// public void testDatePartition() throws Exception { -// MutableContext context = new MutableMapContext(); -// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, "DCOL"); -// context.setString(GenericJdbcConnectorConstants -// .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.DATE)); -// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, -// Date.valueOf("2004-10-20").toString()); -// context.setString(GenericJdbcConnectorConstants -// .CONNECTOR_JDBC_PARTITION_MAXVALUE, Date.valueOf("2013-10-17") -// .toString()); -// -// -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ImportJobConfiguration jobConf = new ImportJobConfiguration(); -// -// Partitioner partitioner = new GenericJdbcImportPartitioner(); -// PartitionerContext partitionerContext = new PartitionerContext(context, 3, null); -// List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); -// -// -// verifyResult(partitions, new String[]{ -// "'2004-10-20' <= DCOL AND DCOL < '2007-10-19'", -// "'2007-10-19' <= DCOL AND DCOL < '2010-10-18'", -// "'2010-10-18' <= DCOL AND DCOL <= '2013-10-17'", -// }); -// -// } -// -// public void testTimePartition() throws Exception { -// MutableContext context = new MutableMapContext(); -// context.setString(GenericJdbcConnectorConstants -// .CONNECTOR_JDBC_PARTITION_COLUMNNAME, "TCOL"); -// context.setString(GenericJdbcConnectorConstants -// .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.TIME)); -// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, -// Time.valueOf("01:01:01").toString()); -// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, -// Time.valueOf("10:40:50").toString()); -// -// -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ImportJobConfiguration jobConf = new ImportJobConfiguration(); -// -// Partitioner partitioner = new GenericJdbcImportPartitioner(); -// PartitionerContext partitionerContext = new PartitionerContext(context, 3, null); -// List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); -// -// verifyResult(partitions, new String[]{ -// "'01:01:01' <= TCOL AND TCOL < '04:14:17'", -// "'04:14:17' <= TCOL AND TCOL < '07:27:33'", -// "'07:27:33' <= TCOL AND TCOL <= '10:40:50'", -// }); -// } -// -// public void testTimestampPartition() throws Exception { -// MutableContext context = new MutableMapContext(); -// context.setString(GenericJdbcConnectorConstants -// .CONNECTOR_JDBC_PARTITION_COLUMNNAME, "TSCOL"); -// context.setString(GenericJdbcConnectorConstants -// .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.TIMESTAMP)); -// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, -// Timestamp.valueOf("2013-01-01 01:01:01.123").toString()); -// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, -// Timestamp.valueOf("2013-12-31 10:40:50.654").toString()); -// -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ImportJobConfiguration jobConf = new ImportJobConfiguration(); -// -// Partitioner partitioner = new GenericJdbcImportPartitioner(); -// PartitionerContext partitionerContext = new PartitionerContext(context, 3, null); -// List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); -// verifyResult(partitions, new String[]{ -// "'2013-01-01 01:01:01.123' <= TSCOL AND TSCOL < '2013-05-02 12:14:17.634'", -// "'2013-05-02 12:14:17.634' <= TSCOL AND TSCOL < '2013-08-31 23:27:34.144'", -// "'2013-08-31 23:27:34.144' <= TSCOL AND TSCOL <= '2013-12-31 10:40:50.654'", -// }); -// } -// -// public void testBooleanPartition() throws Exception { -// MutableContext context = new MutableMapContext(); -// context.setString(GenericJdbcConnectorConstants -// .CONNECTOR_JDBC_PARTITION_COLUMNNAME, "BCOL"); -// context.setString(GenericJdbcConnectorConstants -// .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.BOOLEAN)); -// context.setString(GenericJdbcConnectorConstants -// .CONNECTOR_JDBC_PARTITION_MINVALUE, "0"); -// context.setString(GenericJdbcConnectorConstants -// .CONNECTOR_JDBC_PARTITION_MAXVALUE, "1"); -// -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ImportJobConfiguration jobConf = new ImportJobConfiguration(); -// -// Partitioner partitioner = new GenericJdbcImportPartitioner(); -// PartitionerContext partitionerContext = new PartitionerContext(context, 3, null); -// List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); -// verifyResult(partitions, new String[]{ -// "BCOL = TRUE", -// "BCOL = FALSE", -// }); -// } -// -// public void testVarcharPartition() throws Exception { -// MutableContext context = new MutableMapContext(); -// context.setString(GenericJdbcConnectorConstants -// .CONNECTOR_JDBC_PARTITION_COLUMNNAME, "VCCOL"); -// context.setString(GenericJdbcConnectorConstants -// .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.VARCHAR)); -// context.setString(GenericJdbcConnectorConstants -// .CONNECTOR_JDBC_PARTITION_MINVALUE, "A"); -// context.setString(GenericJdbcConnectorConstants -// .CONNECTOR_JDBC_PARTITION_MAXVALUE, "Z"); -// -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ImportJobConfiguration jobConf = new ImportJobConfiguration(); -// -// Partitioner partitioner = new GenericJdbcImportPartitioner(); -// PartitionerContext partitionerContext = new PartitionerContext(context, 25, null); -// List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); -// -// verifyResult(partitions, new String[] { -// "'A' <= VCCOL AND VCCOL < 'B'", -// "'B' <= VCCOL AND VCCOL < 'C'", -// "'C' <= VCCOL AND VCCOL < 'D'", -// "'D' <= VCCOL AND VCCOL < 'E'", -// "'E' <= VCCOL AND VCCOL < 'F'", -// "'F' <= VCCOL AND VCCOL < 'G'", -// "'G' <= VCCOL AND VCCOL < 'H'", -// "'H' <= VCCOL AND VCCOL < 'I'", -// "'I' <= VCCOL AND VCCOL < 'J'", -// "'J' <= VCCOL AND VCCOL < 'K'", -// "'K' <= VCCOL AND VCCOL < 'L'", -// "'L' <= VCCOL AND VCCOL < 'M'", -// "'M' <= VCCOL AND VCCOL < 'N'", -// "'N' <= VCCOL AND VCCOL < 'O'", -// "'O' <= VCCOL AND VCCOL < 'P'", -// "'P' <= VCCOL AND VCCOL < 'Q'", -// "'Q' <= VCCOL AND VCCOL < 'R'", -// "'R' <= VCCOL AND VCCOL < 'S'", -// "'S' <= VCCOL AND VCCOL < 'T'", -// "'T' <= VCCOL AND VCCOL < 'U'", -// "'U' <= VCCOL AND VCCOL < 'V'", -// "'V' <= VCCOL AND VCCOL < 'W'", -// "'W' <= VCCOL AND VCCOL < 'X'", -// "'X' <= VCCOL AND VCCOL < 'Y'", -// "'Y' <= VCCOL AND VCCOL <= 'Z'", -// }); -// } -// -// public void testVarcharPartition2() throws Exception { -// MutableContext context = new MutableMapContext(); -// context.setString(GenericJdbcConnectorConstants -// .CONNECTOR_JDBC_PARTITION_COLUMNNAME, "VCCOL"); -// context.setString(GenericJdbcConnectorConstants -// .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.VARCHAR)); -// context.setString(GenericJdbcConnectorConstants -// .CONNECTOR_JDBC_PARTITION_MINVALUE, "Breezy Badger"); -// context.setString(GenericJdbcConnectorConstants -// .CONNECTOR_JDBC_PARTITION_MAXVALUE, "Warty Warthog"); -// -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ImportJobConfiguration jobConf = new ImportJobConfiguration(); -// Partitioner partitioner = new GenericJdbcImportPartitioner(); -// PartitionerContext partitionerContext = new PartitionerContext(context, 5, null); -// List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); -// assertEquals(partitions.size(), 5); -// // First partition needs to contain entire upper bound -// assertTrue(partitions.get(0).toString().contains("Breezy Badger")); -// // Last partition needs to contain entire lower bound -// assertTrue(partitions.get(4).toString().contains("Warty Warthog")); -// } -// -// public void testVarcharPartitionWithCommonPrefix() throws Exception { -// MutableContext context = new MutableMapContext(); -// context.setString(GenericJdbcConnectorConstants -// .CONNECTOR_JDBC_PARTITION_COLUMNNAME, "VCCOL"); -// context.setString(GenericJdbcConnectorConstants -// .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.VARCHAR)); -// context.setString(GenericJdbcConnectorConstants -// .CONNECTOR_JDBC_PARTITION_MINVALUE, "AAA"); -// context.setString(GenericJdbcConnectorConstants -// .CONNECTOR_JDBC_PARTITION_MAXVALUE, "AAF"); -// -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ImportJobConfiguration jobConf = new ImportJobConfiguration(); -// -// Partitioner partitioner = new GenericJdbcImportPartitioner(); -// PartitionerContext partitionerContext = new PartitionerContext(context, 5, null); -// -// List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); -// -// verifyResult(partitions, new String[] { -// "'AAA' <= VCCOL AND VCCOL < 'AAB'", -// "'AAB' <= VCCOL AND VCCOL < 'AAC'", -// "'AAC' <= VCCOL AND VCCOL < 'AAD'", -// "'AAD' <= VCCOL AND VCCOL < 'AAE'", -// "'AAE' <= VCCOL AND VCCOL <= 'AAF'", -// }); -// -// } -// -// public void testPatitionWithNullValues() throws Exception { -// MutableContext context = new MutableMapContext(); -// context.setString(GenericJdbcConnectorConstants -// .CONNECTOR_JDBC_PARTITION_COLUMNNAME, "VCCOL"); -// context.setString(GenericJdbcConnectorConstants -// .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.VARCHAR)); -// context.setString(GenericJdbcConnectorConstants -// .CONNECTOR_JDBC_PARTITION_MINVALUE, "AAA"); -// context.setString(GenericJdbcConnectorConstants -// .CONNECTOR_JDBC_PARTITION_MAXVALUE, "AAE"); -// -// ConnectionConfiguration connConf = new ConnectionConfiguration(); -// ImportJobConfiguration jobConf = new ImportJobConfiguration(); -// jobConf.fromTable.partitionColumnNull = true; -// -// Partitioner partitioner = new GenericJdbcImportPartitioner(); -// PartitionerContext partitionerContext = new PartitionerContext(context, 5, null); -// -// List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); -// -// verifyResult(partitions, new String[] { -// "VCCOL IS NULL", -// "'AAA' <= VCCOL AND VCCOL < 'AAB'", -// "'AAB' <= VCCOL AND VCCOL < 'AAC'", -// "'AAC' <= VCCOL AND VCCOL < 'AAD'", -// "'AAD' <= VCCOL AND VCCOL <= 'AAE'", -// }); -// -// } -// -// private void verifyResult(List<Partition> partitions, -// String[] expected) { -// assertEquals(expected.length, partitions.size()); -// -// Iterator<Partition> iterator = partitions.iterator(); -// for (int i = 0; i < expected.length; i++) { -// assertEquals(expected[i], -// ((GenericJdbcImportPartition)iterator.next()).getConditions()); -// } -// } -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/bfb0f206/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestLoader.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestLoader.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestLoader.java new file mode 100644 index 0000000..d7e8c6c --- /dev/null +++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestLoader.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.jdbc; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.sql.ResultSet; +import java.util.Arrays; +import java.util.Collection; + +import org.apache.sqoop.common.MutableContext; +import org.apache.sqoop.common.MutableMapContext; +import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration; +import org.apache.sqoop.connector.jdbc.configuration.ToJobConfiguration; +import org.apache.sqoop.etl.io.DataReader; +import org.apache.sqoop.job.etl.Loader; +import org.apache.sqoop.job.etl.LoaderContext; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +@RunWith(Parameterized.class) +public class TestLoader { + + private final String tableName; + + private GenericJdbcExecutor executor; + + private static final int START = -50; + + private int numberOfRows; + + @Parameters + public static Collection<Object[]> data() { + return Arrays.asList(new Object[][] {{50}, {100}, {101}, {150}, {200}}); + } + + public TestLoader(int numberOfRows) { + this.numberOfRows = numberOfRows; + tableName = getClass().getSimpleName().toUpperCase(); + } + + @Before + public void setUp() { + executor = new GenericJdbcExecutor(GenericJdbcTestConstants.DRIVER, + GenericJdbcTestConstants.URL, null, null); + + if (!executor.existTable(tableName)) { + executor.executeUpdate("CREATE TABLE " + + executor.delimitIdentifier(tableName) + + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))"); + } else { + executor.deleteTableData(tableName); + } + } + + @After + public void tearDown() { + executor.close(); + } + + @Test + public void testInsert() throws Exception { + MutableContext context = new MutableMapContext(); + + ConnectionConfiguration connectionConfig = new ConnectionConfiguration(); + + connectionConfig.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; + connectionConfig.connection.connectionString = GenericJdbcTestConstants.URL; + + ToJobConfiguration jobConfig = new ToJobConfiguration(); + + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_TO_DATA_SQL, + "INSERT INTO " + executor.delimitIdentifier(tableName) + " VALUES (?,?,?)"); + + Loader loader = new GenericJdbcLoader(); + DummyReader reader = new DummyReader(); + LoaderContext loaderContext = new LoaderContext(context, reader, null); + loader.load(loaderContext, connectionConfig, jobConfig); + + int index = START; + ResultSet rs = executor.executeQuery("SELECT * FROM " + + executor.delimitIdentifier(tableName) + " ORDER BY ICOL"); + while (rs.next()) { + assertEquals(index, rs.getObject(1)); + assertEquals((double) index, rs.getObject(2)); + assertEquals(String.valueOf(index), rs.getObject(3)); + index++; + } + assertEquals(numberOfRows, index-START); + } + + public class DummyReader extends DataReader { + int index = 0; + + @Override + public Object[] readArrayRecord() { + if (index < numberOfRows) { + Object[] array = new Object[] { + START + index, + (double) (START + index), + String.valueOf(START+index) }; + index++; + return array; + } else { + return null; + } + } + + @Override + public String readTextRecord() { + fail("This method should not be invoked."); + return null; + } + + @Override + public Object readContent() throws Exception { + fail("This method should not be invoked."); + return null; + } + + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/bfb0f206/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestPartitioner.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestPartitioner.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestPartitioner.java new file mode 100644 index 0000000..f1023c8 --- /dev/null +++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestPartitioner.java @@ -0,0 +1,503 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.jdbc; + +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.sql.Types; +import java.util.Iterator; +import java.util.List; + +import junit.framework.TestCase; + +import org.apache.sqoop.common.MutableContext; +import org.apache.sqoop.common.MutableMapContext; +import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration; +import org.apache.sqoop.connector.jdbc.configuration.FromJobConfiguration; +import org.apache.sqoop.job.etl.Partition; +import org.apache.sqoop.job.etl.Partitioner; +import org.apache.sqoop.job.etl.PartitionerContext; + +public class TestPartitioner extends TestCase { + + private static final int START = -5; + private static final int NUMBER_OF_ROWS = 11; + + public void testIntegerEvenPartition() throws Exception { + MutableContext context = new MutableMapContext(); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, + "ICOL"); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, + String.valueOf(Types.INTEGER)); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, + String.valueOf(START)); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, + String.valueOf(START + NUMBER_OF_ROWS - 1)); + + ConnectionConfiguration connConf = new ConnectionConfiguration(); + FromJobConfiguration jobConf = new FromJobConfiguration(); + + Partitioner partitioner = new GenericJdbcPartitioner(); + PartitionerContext partitionerContext = new PartitionerContext(context, 5, null); + List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); + + verifyResult(partitions, new String[] { + "-5 <= ICOL AND ICOL < -3", + "-3 <= ICOL AND ICOL < -1", + "-1 <= ICOL AND ICOL < 1", + "1 <= ICOL AND ICOL < 3", + "3 <= ICOL AND ICOL <= 5" + }); + } + + public void testIntegerUnevenPartition() throws Exception { + MutableContext context = new MutableMapContext(); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, + "ICOL"); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, + String.valueOf(Types.INTEGER)); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, + String.valueOf(START)); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, + String.valueOf(START + NUMBER_OF_ROWS - 1)); + + ConnectionConfiguration connConf = new ConnectionConfiguration(); + FromJobConfiguration jobConf = new FromJobConfiguration(); + + Partitioner partitioner = new GenericJdbcPartitioner(); + PartitionerContext partitionerContext = new PartitionerContext(context, 3, null); + List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); + + verifyResult(partitions, new String[] { + "-5 <= ICOL AND ICOL < -1", + "-1 <= ICOL AND ICOL < 2", + "2 <= ICOL AND ICOL <= 5" + }); + } + + public void testIntegerOverPartition() throws Exception { + MutableContext context = new MutableMapContext(); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, + "ICOL"); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, + String.valueOf(Types.INTEGER)); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, + String.valueOf(START)); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, + String.valueOf(START + NUMBER_OF_ROWS - 1)); + + ConnectionConfiguration connConf = new ConnectionConfiguration(); + FromJobConfiguration jobConf = new FromJobConfiguration(); + + Partitioner partitioner = new GenericJdbcPartitioner(); + PartitionerContext partitionerContext = new PartitionerContext(context, 13, null); + List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); + + verifyResult(partitions, new String[] { + "-5 <= ICOL AND ICOL < -4", + "-4 <= ICOL AND ICOL < -3", + "-3 <= ICOL AND ICOL < -2", + "-2 <= ICOL AND ICOL < -1", + "-1 <= ICOL AND ICOL < 0", + "0 <= ICOL AND ICOL < 1", + "1 <= ICOL AND ICOL < 2", + "2 <= ICOL AND ICOL < 3", + "3 <= ICOL AND ICOL < 4", + "4 <= ICOL AND ICOL <= 5" + }); + } + + public void testFloatingPointEvenPartition() throws Exception { + MutableContext context = new MutableMapContext(); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, + "DCOL"); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, + String.valueOf(Types.DOUBLE)); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, + String.valueOf((double)START)); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, + String.valueOf((double)(START + NUMBER_OF_ROWS - 1))); + + ConnectionConfiguration connConf = new ConnectionConfiguration(); + FromJobConfiguration jobConf = new FromJobConfiguration(); + + Partitioner partitioner = new GenericJdbcPartitioner(); + PartitionerContext partitionerContext = new PartitionerContext(context, 5, null); + List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); + + verifyResult(partitions, new String[] { + "-5.0 <= DCOL AND DCOL < -3.0", + "-3.0 <= DCOL AND DCOL < -1.0", + "-1.0 <= DCOL AND DCOL < 1.0", + "1.0 <= DCOL AND DCOL < 3.0", + "3.0 <= DCOL AND DCOL <= 5.0" + }); + } + + public void testFloatingPointUnevenPartition() throws Exception { + MutableContext context = new MutableMapContext(); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, + "DCOL"); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, + String.valueOf(Types.DOUBLE)); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, + String.valueOf((double)START)); + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, + String.valueOf((double)(START + NUMBER_OF_ROWS - 1))); + + ConnectionConfiguration connConf = new ConnectionConfiguration(); + FromJobConfiguration jobConf = new FromJobConfiguration(); + + Partitioner partitioner = new GenericJdbcPartitioner(); + PartitionerContext partitionerContext = new PartitionerContext(context, 3, null); + List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); + + verifyResult(partitions, new String[] { + "-5.0 <= DCOL AND DCOL < -1.6666666666666665", + "-1.6666666666666665 <= DCOL AND DCOL < 1.666666666666667", + "1.666666666666667 <= DCOL AND DCOL <= 5.0" + }); + } + + public void testNumericEvenPartition() throws Exception { + MutableContext context = new MutableMapContext(); + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, "ICOL"); + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.NUMERIC)); + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, String.valueOf(START)); + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, String.valueOf(START + NUMBER_OF_ROWS - 1)); + + ConnectionConfiguration connConf = new ConnectionConfiguration(); + FromJobConfiguration jobConf = new FromJobConfiguration(); + + Partitioner partitioner = new GenericJdbcPartitioner(); + PartitionerContext partitionerContext = new PartitionerContext(context, 5, null); + List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); + + verifyResult(partitions, new String[] { + "-5 <= ICOL AND ICOL < -3", + "-3 <= ICOL AND ICOL < -1", + "-1 <= ICOL AND ICOL < 1", + "1 <= ICOL AND ICOL < 3", + "3 <= ICOL AND ICOL <= 5" + }); + } + + public void testNumericUnevenPartition() throws Exception { + MutableContext context = new MutableMapContext(); + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, "DCOL"); + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.NUMERIC)); + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, String.valueOf(new BigDecimal(START))); + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, String.valueOf(new BigDecimal(START + NUMBER_OF_ROWS - 1))); + + ConnectionConfiguration connConf = new ConnectionConfiguration(); + FromJobConfiguration jobConf = new FromJobConfiguration(); + + Partitioner partitioner = new GenericJdbcPartitioner(); + PartitionerContext partitionerContext = new PartitionerContext(context, 3, null); + List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); + + verifyResult(partitions, new String[]{ + "-5 <= DCOL AND DCOL < -2", + "-2 <= DCOL AND DCOL < 1", + "1 <= DCOL AND DCOL <= 5" + }); + } + + public void testNumericSinglePartition() throws Exception { + MutableContext context = new MutableMapContext(); + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, "DCOL"); + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.NUMERIC)); + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, String.valueOf(new BigDecimal(START))); + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, String.valueOf(new BigDecimal(START))); + + ConnectionConfiguration connConf = new ConnectionConfiguration(); + FromJobConfiguration jobConf = new FromJobConfiguration(); + + Partitioner partitioner = new GenericJdbcPartitioner(); + PartitionerContext partitionerContext = new PartitionerContext(context, 3, null); + List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); + + verifyResult(partitions, new String[]{ + "DCOL = -5", + }); + } + + + public void testDatePartition() throws Exception { + MutableContext context = new MutableMapContext(); + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, "DCOL"); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.DATE)); + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, + Date.valueOf("2004-10-20").toString()); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_MAXVALUE, Date.valueOf("2013-10-17") + .toString()); + + + ConnectionConfiguration connConf = new ConnectionConfiguration(); + FromJobConfiguration jobConf = new FromJobConfiguration(); + + Partitioner partitioner = new GenericJdbcPartitioner(); + PartitionerContext partitionerContext = new PartitionerContext(context, 3, null); + List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); + + + verifyResult(partitions, new String[]{ + "'2004-10-20' <= DCOL AND DCOL < '2007-10-19'", + "'2007-10-19' <= DCOL AND DCOL < '2010-10-18'", + "'2010-10-18' <= DCOL AND DCOL <= '2013-10-17'", + }); + + } + + public void testTimePartition() throws Exception { + MutableContext context = new MutableMapContext(); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_COLUMNNAME, "TCOL"); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.TIME)); + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, + Time.valueOf("01:01:01").toString()); + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, + Time.valueOf("10:40:50").toString()); + + + ConnectionConfiguration connConf = new ConnectionConfiguration(); + FromJobConfiguration jobConf = new FromJobConfiguration(); + + Partitioner partitioner = new GenericJdbcPartitioner(); + PartitionerContext partitionerContext = new PartitionerContext(context, 3, null); + List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); + + verifyResult(partitions, new String[]{ + "'01:01:01' <= TCOL AND TCOL < '04:14:17'", + "'04:14:17' <= TCOL AND TCOL < '07:27:33'", + "'07:27:33' <= TCOL AND TCOL <= '10:40:50'", + }); + } + + public void testTimestampPartition() throws Exception { + MutableContext context = new MutableMapContext(); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_COLUMNNAME, "TSCOL"); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.TIMESTAMP)); + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, + Timestamp.valueOf("2013-01-01 01:01:01.123").toString()); + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, + Timestamp.valueOf("2013-12-31 10:40:50.654").toString()); + + ConnectionConfiguration connConf = new ConnectionConfiguration(); + FromJobConfiguration jobConf = new FromJobConfiguration(); + + Partitioner partitioner = new GenericJdbcPartitioner(); + PartitionerContext partitionerContext = new PartitionerContext(context, 3, null); + List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); + verifyResult(partitions, new String[]{ + "'2013-01-01 01:01:01.123' <= TSCOL AND TSCOL < '2013-05-02 12:14:17.634'", + "'2013-05-02 12:14:17.634' <= TSCOL AND TSCOL < '2013-08-31 23:27:34.144'", + "'2013-08-31 23:27:34.144' <= TSCOL AND TSCOL <= '2013-12-31 10:40:50.654'", + }); + } + + public void testBooleanPartition() throws Exception { + MutableContext context = new MutableMapContext(); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_COLUMNNAME, "BCOL"); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.BOOLEAN)); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_MINVALUE, "0"); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_MAXVALUE, "1"); + + ConnectionConfiguration connConf = new ConnectionConfiguration(); + FromJobConfiguration jobConf = new FromJobConfiguration(); + + Partitioner partitioner = new GenericJdbcPartitioner(); + PartitionerContext partitionerContext = new PartitionerContext(context, 3, null); + List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); + verifyResult(partitions, new String[]{ + "BCOL = TRUE", + "BCOL = FALSE", + }); + } + + public void testVarcharPartition() throws Exception { + MutableContext context = new MutableMapContext(); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_COLUMNNAME, "VCCOL"); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.VARCHAR)); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_MINVALUE, "A"); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_MAXVALUE, "Z"); + + ConnectionConfiguration connConf = new ConnectionConfiguration(); + FromJobConfiguration jobConf = new FromJobConfiguration(); + + Partitioner partitioner = new GenericJdbcPartitioner(); + PartitionerContext partitionerContext = new PartitionerContext(context, 25, null); + List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); + + verifyResult(partitions, new String[] { + "'A' <= VCCOL AND VCCOL < 'B'", + "'B' <= VCCOL AND VCCOL < 'C'", + "'C' <= VCCOL AND VCCOL < 'D'", + "'D' <= VCCOL AND VCCOL < 'E'", + "'E' <= VCCOL AND VCCOL < 'F'", + "'F' <= VCCOL AND VCCOL < 'G'", + "'G' <= VCCOL AND VCCOL < 'H'", + "'H' <= VCCOL AND VCCOL < 'I'", + "'I' <= VCCOL AND VCCOL < 'J'", + "'J' <= VCCOL AND VCCOL < 'K'", + "'K' <= VCCOL AND VCCOL < 'L'", + "'L' <= VCCOL AND VCCOL < 'M'", + "'M' <= VCCOL AND VCCOL < 'N'", + "'N' <= VCCOL AND VCCOL < 'O'", + "'O' <= VCCOL AND VCCOL < 'P'", + "'P' <= VCCOL AND VCCOL < 'Q'", + "'Q' <= VCCOL AND VCCOL < 'R'", + "'R' <= VCCOL AND VCCOL < 'S'", + "'S' <= VCCOL AND VCCOL < 'T'", + "'T' <= VCCOL AND VCCOL < 'U'", + "'U' <= VCCOL AND VCCOL < 'V'", + "'V' <= VCCOL AND VCCOL < 'W'", + "'W' <= VCCOL AND VCCOL < 'X'", + "'X' <= VCCOL AND VCCOL < 'Y'", + "'Y' <= VCCOL AND VCCOL <= 'Z'", + }); + } + + public void testVarcharPartition2() throws Exception { + MutableContext context = new MutableMapContext(); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_COLUMNNAME, "VCCOL"); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.VARCHAR)); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_MINVALUE, "Breezy Badger"); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_MAXVALUE, "Warty Warthog"); + + ConnectionConfiguration connConf = new ConnectionConfiguration(); + FromJobConfiguration jobConf = new FromJobConfiguration(); + Partitioner partitioner = new GenericJdbcPartitioner(); + PartitionerContext partitionerContext = new PartitionerContext(context, 5, null); + List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); + assertEquals(partitions.size(), 5); + // First partition needs to contain entire upper bound + assertTrue(partitions.get(0).toString().contains("Breezy Badger")); + // Last partition needs to contain entire lower bound + assertTrue(partitions.get(4).toString().contains("Warty Warthog")); + } + + public void testVarcharPartitionWithCommonPrefix() throws Exception { + MutableContext context = new MutableMapContext(); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_COLUMNNAME, "VCCOL"); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.VARCHAR)); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_MINVALUE, "AAA"); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_MAXVALUE, "AAF"); + + ConnectionConfiguration connConf = new ConnectionConfiguration(); + FromJobConfiguration jobConf = new FromJobConfiguration(); + + Partitioner partitioner = new GenericJdbcPartitioner(); + PartitionerContext partitionerContext = new PartitionerContext(context, 5, null); + + List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); + + verifyResult(partitions, new String[] { + "'AAA' <= VCCOL AND VCCOL < 'AAB'", + "'AAB' <= VCCOL AND VCCOL < 'AAC'", + "'AAC' <= VCCOL AND VCCOL < 'AAD'", + "'AAD' <= VCCOL AND VCCOL < 'AAE'", + "'AAE' <= VCCOL AND VCCOL <= 'AAF'", + }); + + } + + public void testPatitionWithNullValues() throws Exception { + MutableContext context = new MutableMapContext(); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_COLUMNNAME, "VCCOL"); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.VARCHAR)); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_MINVALUE, "AAA"); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_MAXVALUE, "AAE"); + + ConnectionConfiguration connConf = new ConnectionConfiguration(); + FromJobConfiguration jobConf = new FromJobConfiguration(); + jobConf.fromTable.partitionColumnNull = true; + + Partitioner partitioner = new GenericJdbcPartitioner(); + PartitionerContext partitionerContext = new PartitionerContext(context, 5, null); + + List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); + + verifyResult(partitions, new String[] { + "VCCOL IS NULL", + "'AAA' <= VCCOL AND VCCOL < 'AAB'", + "'AAB' <= VCCOL AND VCCOL < 'AAC'", + "'AAC' <= VCCOL AND VCCOL < 'AAD'", + "'AAD' <= VCCOL AND VCCOL <= 'AAE'", + }); + + } + + private void verifyResult(List<Partition> partitions, + String[] expected) { + assertEquals(expected.length, partitions.size()); + + Iterator<Partition> iterator = partitions.iterator(); + for (int i = 0; i < expected.length; i++) { + assertEquals(expected[i], + ((GenericJdbcPartition)iterator.next()).getConditions()); + } + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/bfb0f206/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestToInitializer.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestToInitializer.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestToInitializer.java new file mode 100644 index 0000000..4831cf8 --- /dev/null +++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestToInitializer.java @@ -0,0 +1,362 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.jdbc; + +import junit.framework.TestCase; +import org.apache.sqoop.common.MutableContext; +import org.apache.sqoop.common.MutableMapContext; +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration; +import org.apache.sqoop.connector.jdbc.configuration.ToJobConfiguration; +import org.apache.sqoop.job.etl.Initializer; +import org.apache.sqoop.job.etl.InitializerContext; +import org.apache.sqoop.validation.Status; +import org.apache.sqoop.validation.Validation; + +public class TestToInitializer extends TestCase { + private final String schemaName; + private final String tableName; + private final String schemalessTableName; + private final String stageTableName; + private final String tableSql; + private final String schemalessTableSql; + private final String tableColumns; + + private GenericJdbcExecutor executor; + + public TestToInitializer() { + schemaName = getClass().getSimpleName().toUpperCase() + "SCHEMA"; + tableName = getClass().getSimpleName().toUpperCase() + "TABLEWITHSCHEMA"; + schemalessTableName = getClass().getSimpleName().toUpperCase() + "TABLE"; + stageTableName = getClass().getSimpleName().toUpperCase() + + "_STAGE_TABLE"; + tableSql = "INSERT INTO " + tableName + " VALUES (?,?,?)"; + schemalessTableSql = "INSERT INTO " + schemalessTableName + " VALUES (?,?,?)"; + tableColumns = "ICOL,VCOL"; + } + + @Override + public void setUp() { + executor = new GenericJdbcExecutor(GenericJdbcTestConstants.DRIVER, + GenericJdbcTestConstants.URL, null, null); + + String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName); + if (!executor.existTable(tableName)) { + executor.executeUpdate("CREATE SCHEMA " + executor.delimitIdentifier(schemaName)); + executor.executeUpdate("CREATE TABLE " + fullTableName + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))"); + } + + fullTableName = executor.delimitIdentifier(schemalessTableName); + if (!executor.existTable(schemalessTableName)) { + executor.executeUpdate("CREATE TABLE " + fullTableName + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))"); + } + } + + @Override + public void tearDown() { + executor.close(); + } + + @SuppressWarnings("unchecked") + public void testTableName() throws Exception { + ConnectionConfiguration connConf = new ConnectionConfiguration(); + ToJobConfiguration jobConf = new ToJobConfiguration(); + + String fullTableName = executor.delimitIdentifier(schemalessTableName); + + connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; + connConf.connection.connectionString = GenericJdbcTestConstants.URL; + jobConf.toTable.tableName = schemalessTableName; + + MutableContext context = new MutableMapContext(); + InitializerContext initializerContext = new InitializerContext(context); + + @SuppressWarnings("rawtypes") + Initializer initializer = new GenericJdbcToInitializer(); + initializer.initialize(initializerContext, connConf, jobConf); + + verifyResult(context, "INSERT INTO " + fullTableName + " VALUES (?,?,?)"); + } + + @SuppressWarnings("unchecked") + public void testTableNameWithTableColumns() throws Exception { + ConnectionConfiguration connConf = new ConnectionConfiguration(); + ToJobConfiguration jobConf = new ToJobConfiguration(); + + String fullTableName = executor.delimitIdentifier(schemalessTableName); + + connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; + connConf.connection.connectionString = GenericJdbcTestConstants.URL; + jobConf.toTable.tableName = schemalessTableName; + jobConf.toTable.columns = tableColumns; + + MutableContext context = new MutableMapContext(); + InitializerContext initializerContext = new InitializerContext(context); + + @SuppressWarnings("rawtypes") + Initializer initializer = new GenericJdbcToInitializer(); + initializer.initialize(initializerContext, connConf, jobConf); + + verifyResult(context, "INSERT INTO " + fullTableName + " (" + tableColumns + ") VALUES (?,?)"); + } + + @SuppressWarnings("unchecked") + public void testTableSql() throws Exception { + ConnectionConfiguration connConf = new ConnectionConfiguration(); + ToJobConfiguration jobConf = new ToJobConfiguration(); + + connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; + connConf.connection.connectionString = GenericJdbcTestConstants.URL; + jobConf.toTable.sql = schemalessTableSql; + + MutableContext context = new MutableMapContext(); + InitializerContext initializerContext = new InitializerContext(context); + + @SuppressWarnings("rawtypes") + Initializer initializer = new GenericJdbcToInitializer(); + initializer.initialize(initializerContext, connConf, jobConf); + + verifyResult(context, "INSERT INTO " + executor.delimitIdentifier(schemalessTableName) + " VALUES (?,?,?)"); + } + + @SuppressWarnings("unchecked") + public void testTableNameWithSchema() throws Exception { + ConnectionConfiguration connConf = new ConnectionConfiguration(); + ToJobConfiguration jobConf = new ToJobConfiguration(); + + String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName); + + connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; + connConf.connection.connectionString = GenericJdbcTestConstants.URL; + jobConf.toTable.schemaName = schemaName; + jobConf.toTable.tableName = tableName; + + MutableContext context = new MutableMapContext(); + InitializerContext initializerContext = new InitializerContext(context); + + @SuppressWarnings("rawtypes") + Initializer initializer = new GenericJdbcToInitializer(); + initializer.initialize(initializerContext, connConf, jobConf); + + verifyResult(context, "INSERT INTO " + fullTableName + " VALUES (?,?,?)"); + } + + @SuppressWarnings("unchecked") + public void testTableNameWithTableColumnsWithSchema() throws Exception { + ConnectionConfiguration connConf = new ConnectionConfiguration(); + ToJobConfiguration jobConf = new ToJobConfiguration(); + + String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName); + + connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; + connConf.connection.connectionString = GenericJdbcTestConstants.URL; + jobConf.toTable.schemaName = schemaName; + jobConf.toTable.tableName = tableName; + jobConf.toTable.columns = tableColumns; + + MutableContext context = new MutableMapContext(); + InitializerContext initializerContext = new InitializerContext(context); + + @SuppressWarnings("rawtypes") + Initializer initializer = new GenericJdbcToInitializer(); + initializer.initialize(initializerContext, connConf, jobConf); + + verifyResult(context, "INSERT INTO " + fullTableName + " (" + tableColumns + ") VALUES (?,?)"); + } + + @SuppressWarnings("unchecked") + public void testTableSqlWithSchema() throws Exception { + ConnectionConfiguration connConf = new ConnectionConfiguration(); + ToJobConfiguration jobConf = new ToJobConfiguration(); + + connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; + connConf.connection.connectionString = GenericJdbcTestConstants.URL; + jobConf.toTable.schemaName = schemaName; + jobConf.toTable.sql = tableSql; + + MutableContext context = new MutableMapContext(); + InitializerContext initializerContext = new InitializerContext(context); + + @SuppressWarnings("rawtypes") + Initializer initializer = new GenericJdbcToInitializer(); + initializer.initialize(initializerContext, connConf, jobConf); + + verifyResult(context, "INSERT INTO " + executor.delimitIdentifier(tableName) + " VALUES (?,?,?)"); + } + + private void verifyResult(MutableContext context, String dataSql) { + assertEquals(dataSql, context.getString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_TO_DATA_SQL)); + } + + private void createTable(String tableName) { + try { + executor.executeUpdate("DROP TABLE " + tableName); + } catch(SqoopException e) { + //Ok to fail as the table might not exist + } + executor.executeUpdate("CREATE TABLE " + tableName + + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))"); + } + + public void testNonExistingStageTable() throws Exception { + ConnectionConfiguration connConf = new ConnectionConfiguration(); + ToJobConfiguration jobConf = new ToJobConfiguration(); + + connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; + connConf.connection.connectionString = GenericJdbcTestConstants.URL; + jobConf.toTable.tableName = schemalessTableName; + jobConf.toTable.stageTableName = stageTableName; + + MutableContext context = new MutableMapContext(); + InitializerContext initializerContext = new InitializerContext(context); + + @SuppressWarnings("rawtypes") + Initializer initializer = new GenericJdbcToInitializer(); + try { + initializer.initialize(initializerContext, connConf, jobConf); + fail("Initialization should fail for non-existing stage table."); + } catch(SqoopException se) { + //expected + } + } + + @SuppressWarnings("unchecked") + public void testNonEmptyStageTable() throws Exception { + ConnectionConfiguration connConf = new ConnectionConfiguration(); + ToJobConfiguration jobConf = new ToJobConfiguration(); + + String fullStageTableName = executor.delimitIdentifier(stageTableName); + + connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; + connConf.connection.connectionString = GenericJdbcTestConstants.URL; + jobConf.toTable.tableName = schemalessTableName; + jobConf.toTable.stageTableName = stageTableName; + createTable(fullStageTableName); + executor.executeUpdate("INSERT INTO " + fullStageTableName + + " VALUES(1, 1.1, 'one')"); + MutableContext context = new MutableMapContext(); + InitializerContext initializerContext = new InitializerContext(context); + + @SuppressWarnings("rawtypes") + Initializer initializer = new GenericJdbcToInitializer(); + try { + initializer.initialize(initializerContext, connConf, jobConf); + fail("Initialization should fail for non-empty stage table."); + } catch(SqoopException se) { + //expected + } + } + + @SuppressWarnings("unchecked") + public void testClearStageTableValidation() throws Exception { + ConnectionConfiguration connConf = new ConnectionConfiguration(); + ToJobConfiguration jobConf = new ToJobConfiguration(); + + connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; + connConf.connection.connectionString = GenericJdbcTestConstants.URL; + //specifying clear stage table flag without specifying name of + // the stage table + jobConf.toTable.tableName = schemalessTableName; + jobConf.toTable.clearStageTable = false; + GenericJdbcValidator validator = new GenericJdbcValidator(); + Validation validation = validator.validateJob(jobConf); + assertEquals("User should not specify clear stage table flag without " + + "specifying name of the stage table", + Status.UNACCEPTABLE, + validation.getStatus()); + assertTrue(validation.getMessages().containsKey( + new Validation.FormInput("toTable"))); + + jobConf.toTable.clearStageTable = true; + validation = validator.validateJob(jobConf); + assertEquals("User should not specify clear stage table flag without " + + "specifying name of the stage table", + Status.UNACCEPTABLE, + validation.getStatus()); + assertTrue(validation.getMessages().containsKey( + new Validation.FormInput("toTable"))); + } + + @SuppressWarnings("unchecked") + public void testStageTableWithoutTable() throws Exception { + ConnectionConfiguration connConf = new ConnectionConfiguration(); + ToJobConfiguration jobConf = new ToJobConfiguration(); + + connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; + connConf.connection.connectionString = GenericJdbcTestConstants.URL; + //specifying stage table without specifying table name + jobConf.toTable.stageTableName = stageTableName; + jobConf.toTable.sql = ""; + + GenericJdbcValidator validator = new GenericJdbcValidator(); + Validation validation = validator.validateJob(jobConf); + assertEquals("Stage table name cannot be specified without specifying " + + "table name", Status.UNACCEPTABLE, validation.getStatus()); + assertTrue(validation.getMessages().containsKey( + new Validation.FormInput("toTable"))); + } + + @SuppressWarnings("unchecked") + public void testClearStageTable() throws Exception { + ConnectionConfiguration connConf = new ConnectionConfiguration(); + ToJobConfiguration jobConf = new ToJobConfiguration(); + + String fullStageTableName = executor.delimitIdentifier(stageTableName); + + connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; + connConf.connection.connectionString = GenericJdbcTestConstants.URL; + jobConf.toTable.tableName = schemalessTableName; + jobConf.toTable.stageTableName = stageTableName; + jobConf.toTable.clearStageTable = true; + createTable(fullStageTableName); + executor.executeUpdate("INSERT INTO " + fullStageTableName + + " VALUES(1, 1.1, 'one')"); + MutableContext context = new MutableMapContext(); + InitializerContext initializerContext = new InitializerContext(context); + + @SuppressWarnings("rawtypes") + Initializer initializer = new GenericJdbcToInitializer(); + initializer.initialize(initializerContext, connConf, jobConf); + assertEquals("Stage table should have been cleared", 0, + executor.getTableRowCount(stageTableName)); + } + + @SuppressWarnings("unchecked") + public void testStageTable() throws Exception { + ConnectionConfiguration connConf = new ConnectionConfiguration(); + ToJobConfiguration jobConf = new ToJobConfiguration(); + + String fullStageTableName = executor.delimitIdentifier(stageTableName); + + connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER; + connConf.connection.connectionString = GenericJdbcTestConstants.URL; + jobConf.toTable.tableName = schemalessTableName; + jobConf.toTable.stageTableName = stageTableName; + createTable(fullStageTableName); + MutableContext context = new MutableMapContext(); + InitializerContext initializerContext = new InitializerContext(context); + + @SuppressWarnings("rawtypes") + Initializer initializer = new GenericJdbcToInitializer(); + initializer.initialize(initializerContext, connConf, jobConf); + + verifyResult(context, "INSERT INTO " + fullStageTableName + + " VALUES (?,?,?)"); + } +}
