Updated Branches: refs/heads/sqoop2 6d1779a05 -> 84127409a
SQOOP-1013: Sqoop2: Provide splitters for additional data types to Generic JDBC Connector (Venkat Ranganathan via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/84127409 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/84127409 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/84127409 Branch: refs/heads/sqoop2 Commit: 84127409a64aeacc220f21566267cf775b60958f Parents: 6d1779a Author: Jarek Jarcec Cecho <[email protected]> Authored: Thu Jun 13 13:39:43 2013 -0700 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Thu Jun 13 13:39:43 2013 -0700 ---------------------------------------------------------------------- .../jdbc/GenericJdbcImportPartitioner.java | 338 +++++++++++++++++-- .../connector/jdbc/TestImportPartitioner.java | 184 ++++++++++ 2 files changed, 499 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/84127409/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java index f80f30d..eef18f2 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java @@ -18,6 +18,9 @@ package org.apache.sqoop.connector.jdbc; import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; import java.sql.Types; import java.util.LinkedList; import java.util.List; @@ -33,6 +36,7 @@ public class GenericJdbcImportPartitioner extends Partitioner<ConnectionConfigur private static final BigDecimal NUMERIC_MIN_INCREMENT = new BigDecimal(10000 * Double.MIN_VALUE); + private long numberPartitions; private String partitionColumnName; private int partitionColumnType; @@ -47,6 +51,14 @@ public class GenericJdbcImportPartitioner extends Partitioner<ConnectionConfigur partitionMinValue = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE); partitionMaxValue = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE); + if (partitionMinValue == null && partitionMaxValue == null) { + List<Partition> partitions = new LinkedList<Partition>(); + GenericJdbcImportPartition partition = new GenericJdbcImportPartition(); + partition.setConditions(partitionColumnName + "IS NULL"); + partitions.add(partition); + return partitions; + } + switch (partitionColumnType) { case Types.TINYINT: case Types.SMALLINT: @@ -69,19 +81,19 @@ public class GenericJdbcImportPartitioner extends Partitioner<ConnectionConfigur case Types.BIT: case Types.BOOLEAN: // Boolean column - // TODO: Add partition function + return partitionBooleanColumn(); case Types.DATE: case Types.TIME: case Types.TIMESTAMP: // Date time column - // TODO: Add partition function + return partitionDateTimeColumn(); case Types.CHAR: case Types.VARCHAR: case Types.LONGVARCHAR: // Text column - // TODO: Add partition function + return partitionTextColumn(); default: throw new SqoopException( @@ -89,18 +101,168 @@ public class GenericJdbcImportPartitioner extends Partitioner<ConnectionConfigur String.valueOf(partitionColumnType)); } } + protected List<Partition> partitionDateTimeColumn() { + List<Partition> partitions = new LinkedList<Partition>(); - protected List<Partition> partitionIntegerColumn() { + long minDateValue = 0; + long maxDateValue = 0; + + switch(partitionColumnType) { + case Types.DATE: + minDateValue = Date.valueOf(partitionMinValue).getTime(); + maxDateValue = Date.valueOf(partitionMaxValue).getTime(); + break; + case Types.TIME: + minDateValue = Time.valueOf(partitionMinValue).getTime(); + maxDateValue = Time.valueOf(partitionMaxValue).getTime(); + break; + case Types.TIMESTAMP: + minDateValue = Timestamp.valueOf(partitionMinValue).getTime(); + maxDateValue = Timestamp.valueOf(partitionMaxValue).getTime(); + break; + } + long interval = (maxDateValue - minDateValue) / numberPartitions; + long remainder = (maxDateValue - minDateValue) % numberPartitions; + + if (interval == 0) { + numberPartitions = (int)remainder; + } + long lowerBound; + long upperBound = minDateValue; + + Object objLB = null; + Object objUB = null; + + for (int i = 1; i < numberPartitions; i++) { + lowerBound = upperBound; + upperBound = lowerBound + interval; + upperBound += (i <= remainder) ? 1 : 0; + + switch(partitionColumnType) { + case Types.DATE: + objLB = new Date(lowerBound); + objUB = new Date(upperBound); + break; + case Types.TIME: + objLB = new Time(lowerBound); + objUB = new Time(upperBound); + break; + case Types.TIMESTAMP: + objLB = new Timestamp(lowerBound); + objUB = new Timestamp(upperBound); + break; + } + + GenericJdbcImportPartition partition = new GenericJdbcImportPartition(); + partition.setConditions( + constructDateConditions(objLB, objUB, false)); + partitions.add(partition); + } + switch(partitionColumnType) { + case Types.DATE: + objLB = new Date(upperBound); + objUB = new Date(maxDateValue); + break; + case Types.TIME: + objLB = new Time(upperBound); + objUB = new Time(maxDateValue); + break; + case Types.TIMESTAMP: + objLB = new Timestamp(upperBound); + objUB = new Timestamp(maxDateValue); + break; + } + GenericJdbcImportPartition partition = new GenericJdbcImportPartition(); + partition.setConditions( + constructDateConditions(objLB, objUB, true)); + partitions.add(partition); + return partitions; + } + + protected List<Partition> partitionTextColumn() { List<Partition> partitions = new LinkedList<Partition>(); - if (partitionMinValue == null && partitionMaxValue == null) { + String minStringValue = null; + String maxStringValue = null; + + // Remove common prefix if any as it does not affect outcome. + int maxPrefixLen = Math.min(partitionMinValue.length(), + partitionMaxValue.length()); + // Calculate common prefix length + int cpLen = 0; + + for (cpLen = 0; cpLen < maxPrefixLen; cpLen++) { + char c1 = partitionMinValue.charAt(cpLen); + char c2 = partitionMaxValue.charAt(cpLen); + if (c1 != c2) { + break; + } + } + + // The common prefix has length 'sharedLen'. Extract it from both. + String prefix = partitionMinValue.substring(0, cpLen); + minStringValue = partitionMinValue.substring(cpLen); + maxStringValue = partitionMaxValue.substring(cpLen); + + BigDecimal minStringBD = textToBigDecimal(minStringValue); + BigDecimal maxStringBD = textToBigDecimal(maxStringValue); + + // Having one single value means that we can create only one single split + if(minStringBD.equals(maxStringBD)) { GenericJdbcImportPartition partition = new GenericJdbcImportPartition(); - partition.setConditions(partitionColumnName + "IS NULL"); + partition.setConditions(constructTextConditions(prefix, maxStringBD)); partitions.add(partition); return partitions; } - long minValue = Long.parseLong(partitionMinValue); + // Get all the split points together. + List<BigDecimal> splitPoints = new LinkedList<BigDecimal>(); + + BigDecimal splitSize = divide(maxStringBD.subtract(minStringBD), + new BigDecimal(numberPartitions)); + if (splitSize.compareTo(NUMERIC_MIN_INCREMENT) < 0) { + splitSize = NUMERIC_MIN_INCREMENT; + } + + BigDecimal curVal = minStringBD; + + while (curVal.compareTo(maxStringBD) <= 0) { + splitPoints.add(curVal); + curVal = curVal.add(splitSize); + } + + if (splitPoints.size() == 0 + || splitPoints.get(0).compareTo(minStringBD) != 0) { + splitPoints.add(0, minStringBD); + } + + if (splitPoints.get(splitPoints.size() - 1).compareTo(maxStringBD) != 0 + || splitPoints.size() == 1) { + splitPoints.add(maxStringBD); + } + + // Turn the split points into a set of string intervals. + BigDecimal start = splitPoints.get(0); + for (int i = 1; i < splitPoints.size(); i++) { + BigDecimal end = splitPoints.get(i); + + GenericJdbcImportPartition partition = new GenericJdbcImportPartition(); + partition.setConditions(constructTextConditions(prefix, start, + end, i == splitPoints.size() - 1)); + partitions.add(partition); + + start = end; + } + + return partitions; + } + + + protected List<Partition> partitionIntegerColumn() { + List<Partition> partitions = new LinkedList<Partition>(); + + long minValue = partitionMinValue == null ? Long.MIN_VALUE + : Long.parseLong(partitionMinValue); long maxValue = Long.parseLong(partitionMaxValue); long interval = (maxValue - minValue) / numberPartitions; @@ -134,14 +296,9 @@ public class GenericJdbcImportPartitioner extends Partitioner<ConnectionConfigur protected List<Partition> partitionFloatingPointColumn() { List<Partition> partitions = new LinkedList<Partition>(); - if (partitionMinValue == null && partitionMaxValue == null) { - GenericJdbcImportPartition partition = new GenericJdbcImportPartition(); - partition.setConditions(partitionColumnName + "IS NULL"); - partitions.add(partition); - return partitions; - } - double minValue = Double.parseDouble(partitionMinValue); + double minValue = partitionMinValue == null ? Double.MIN_VALUE + : Double.parseDouble(partitionMinValue); double maxValue = Double.parseDouble(partitionMaxValue); double interval = (maxValue - minValue) / numberPartitions; @@ -168,15 +325,6 @@ public class GenericJdbcImportPartitioner extends Partitioner<ConnectionConfigur protected List<Partition> partitionNumericColumn() { List<Partition> partitions = new LinkedList<Partition>(); - - // All null values will result in single partition - if (partitionMinValue == null && partitionMaxValue == null) { - GenericJdbcImportPartition partition = new GenericJdbcImportPartition(); - partition.setConditions(partitionColumnName + "IS NULL"); - partitions.add(partition); - return partitions; - } - // Having one end in null is not supported if (partitionMinValue == null || partitionMaxValue == null) { throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0015); @@ -190,12 +338,14 @@ public class GenericJdbcImportPartitioner extends Partitioner<ConnectionConfigur GenericJdbcImportPartition partition = new GenericJdbcImportPartition(); partition.setConditions(constructConditions(minValue)); partitions.add(partition); + return partitions; } // Get all the split points together. List<BigDecimal> splitPoints = new LinkedList<BigDecimal>(); BigDecimal splitSize = divide(maxValue.subtract(minValue), new BigDecimal(numberPartitions)); + if (splitSize.compareTo(NUMERIC_MIN_INCREMENT) < 0) { splitSize = NUMERIC_MIN_INCREMENT; } @@ -227,6 +377,60 @@ public class GenericJdbcImportPartitioner extends Partitioner<ConnectionConfigur return partitions; } + protected List<Partition> partitionBooleanColumn() { + List<Partition> partitions = new LinkedList<Partition>(); + + + Boolean minValue = parseBooleanValue(partitionMinValue); + Boolean maxValue = parseBooleanValue(partitionMaxValue); + + StringBuilder conditions = new StringBuilder(); + + // Having one single value means that we can create only one single split + if(minValue.equals(maxValue)) { + GenericJdbcImportPartition partition = new GenericJdbcImportPartition(); + + conditions.append(partitionColumnName).append(" = ") + .append(maxValue); + partition.setConditions(conditions.toString()); + partitions.add(partition); + return partitions; + } + + GenericJdbcImportPartition partition = new GenericJdbcImportPartition(); + + if (partitionMinValue == null) { + conditions = new StringBuilder(); + conditions.append(partitionColumnName).append(" IS NULL"); + partition.setConditions(conditions.toString()); + partitions.add(partition); + } + partition = new GenericJdbcImportPartition(); + conditions = new StringBuilder(); + conditions.append(partitionColumnName).append(" = TRUE"); + partition.setConditions(conditions.toString()); + partitions.add(partition); + partition = new GenericJdbcImportPartition(); + conditions = new StringBuilder(); + conditions.append(partitionColumnName).append(" = FALSE"); + partition.setConditions(conditions.toString()); + partitions.add(partition); + return partitions; + } + + private Boolean parseBooleanValue(String value) { + if (value == null) { + return null; + } + if (value.equals("1")) { + return Boolean.TRUE; + } else if (value.equals("0")) { + return Boolean.FALSE; + } else { + return Boolean.parseBoolean(value); + } + } + protected BigDecimal divide(BigDecimal numerator, BigDecimal denominator) { try { return numerator.divide(denominator); @@ -256,4 +460,92 @@ public class GenericJdbcImportPartitioner extends Partitioner<ConnectionConfigur .toString() ; } + + protected String constructDateConditions( + Object lowerBound, Object upperBound, boolean lastOne) { + StringBuilder conditions = new StringBuilder(); + conditions.append('\'').append(lowerBound.toString()).append('\''); + conditions.append(" <= "); + conditions.append(partitionColumnName); + conditions.append(" AND "); + conditions.append(partitionColumnName); + conditions.append(lastOne ? " <= " : " < "); + conditions.append('\'').append(upperBound.toString()).append('\''); + return conditions.toString(); + } + + protected String constructTextConditions(String prefix, + Object lowerBound, Object upperBound, boolean lastOne) { + StringBuilder conditions = new StringBuilder(); + String lbString = prefix + bigDecimalToText((BigDecimal)lowerBound); + String ubString = prefix + bigDecimalToText((BigDecimal)upperBound); + conditions.append('\'').append(lbString).append('\''); + conditions.append(" <= "); + conditions.append(partitionColumnName); + conditions.append(" AND "); + conditions.append(partitionColumnName); + conditions.append(lastOne ? " <= " : " < "); + conditions.append('\'').append(ubString).append('\''); + return conditions.toString(); + } + + protected String constructTextConditions(String prefix, Object value) { + return new StringBuilder() + .append(partitionColumnName) + .append(" = ").append('\'') + .append(prefix + bigDecimalToText((BigDecimal)value)) + .append('\'').toString() + ; + } + + + /** + * Converts a string to a BigDecimal representation in Base 2^21 format. + * The maximum Unicode code point value defined is 10FFFF. Although + * not all database system support UTF16 and mostly we expect UCS2 + * characters only, for completeness, we assume that all the unicode + * characters are supported. + * Given a string 's' containing characters s_0, s_1,..s_n, + * the string is interpreted as the number: 0.s_0 s_1 s_2 s_3 s_48) + * This can be split and each split point can be converted back to + * a string value for comparison purposes. The number of characters + * is restricted to prevent repeating fractions and rounding errors + * towards the higher fraction positions. + */ + private static final BigDecimal UNITS_BASE = new BigDecimal(2097152); + private static final int MAX_CHARS_TO_CONVERT = 4; + + private BigDecimal textToBigDecimal(String str) { + BigDecimal result = BigDecimal.ZERO; + BigDecimal divisor = UNITS_BASE; + + int len = Math.min(str.length(), MAX_CHARS_TO_CONVERT); + + for (int n = 0; n < len; ) { + int codePoint = str.codePointAt(n); + n += Character.charCount(codePoint); + BigDecimal val = divide(new BigDecimal(codePoint), divisor); + result = result.add(val); + divisor = divisor.multiply(UNITS_BASE); + } + + return result; + } + + private String bigDecimalToText(BigDecimal bd) { + BigDecimal curVal = bd.stripTrailingZeros(); + StringBuilder sb = new StringBuilder(); + + for (int n = 0; n < MAX_CHARS_TO_CONVERT; ++n) { + curVal = curVal.multiply(UNITS_BASE); + int cp = curVal.intValue(); + if (0 == cp) { + break; + } + curVal = curVal.subtract(new BigDecimal(cp)); + sb.append(Character.toChars(cp)); + } + return sb.toString(); + } + } http://git-wip-us.apache.org/repos/asf/sqoop/blob/84127409/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java index ee314d0..522a515 100644 --- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java +++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java @@ -18,6 +18,9 @@ package org.apache.sqoop.connector.jdbc; import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; import java.sql.Types; import java.util.Iterator; import java.util.List; @@ -26,6 +29,7 @@ import junit.framework.TestCase; import org.apache.sqoop.common.MutableContext; import org.apache.sqoop.common.MutableMapContext; +import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration; import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration; import org.apache.sqoop.job.Constants; @@ -257,6 +261,186 @@ public class TestImportPartitioner extends TestCase { }); } + + public void testDatePartition() throws Exception { + MutableContext context = new MutableMapContext(); + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, "DCOL"); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.DATE)); + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, + Date.valueOf("2013-01-01").toString()); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_MAXVALUE, Date.valueOf("2013-12-31").toString()); + + + ConnectionConfiguration connConf = new ConnectionConfiguration(); + ImportJobConfiguration jobConf = new ImportJobConfiguration(); + + Partitioner partitioner = new GenericJdbcImportPartitioner(); + PartitionerContext partitionerContext = new PartitionerContext(context, + 3); + List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); + verifyResult(partitions, new String[]{ + "'2013-01-01' <= DCOL AND DCOL < '2013-05-02'", + "'2013-05-02' <= DCOL AND DCOL < '2013-08-31'", + "'2013-08-31' <= DCOL AND DCOL <= '2013-12-31'", + }); + + } + + public void testTimePartition() throws Exception { + MutableContext context = new MutableMapContext(); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_COLUMNNAME, "TCOL"); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.TIME)); + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, + Time.valueOf("01:01:01").toString()); + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, + Time.valueOf("10:40:50").toString()); + + + ConnectionConfiguration connConf = new ConnectionConfiguration(); + ImportJobConfiguration jobConf = new ImportJobConfiguration(); + + Partitioner partitioner = new GenericJdbcImportPartitioner(); + PartitionerContext partitionerContext = new PartitionerContext(context, + 3); + List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); + verifyResult(partitions, new String[]{ + "'01:01:01' <= TCOL AND TCOL < '04:14:17'", + "'04:14:17' <= TCOL AND TCOL < '07:27:33'", + "'07:27:33' <= TCOL AND TCOL <= '10:40:50'", + }); + } + + public void testTimestampPartition() throws Exception { + MutableContext context = new MutableMapContext(); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_COLUMNNAME, "TSCOL"); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.TIMESTAMP)); + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, + Timestamp.valueOf("2013-01-01 01:01:01.123").toString()); + context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, + Timestamp.valueOf("2013-12-31 10:40:50.654").toString()); + + ConnectionConfiguration connConf = new ConnectionConfiguration(); + ImportJobConfiguration jobConf = new ImportJobConfiguration(); + + Partitioner partitioner = new GenericJdbcImportPartitioner(); + PartitionerContext partitionerContext = new PartitionerContext(context, + 3); + List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); + verifyResult(partitions, new String[]{ + "'2013-01-01 01:01:01.123' <= TSCOL AND TSCOL < '2013-05-02 13:14:17.634'", + "'2013-05-02 13:14:17.634' <= TSCOL AND TSCOL < '2013-09-01 00:27:34.144'", + "'2013-09-01 00:27:34.144' <= TSCOL AND TSCOL <= '2013-12-31 10:40:50.654'", + }); + } + + public void testBooleanPartition() throws Exception { + MutableContext context = new MutableMapContext(); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_COLUMNNAME, "BCOL"); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.BOOLEAN)); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_MINVALUE, "0"); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_MAXVALUE, "1"); + + ConnectionConfiguration connConf = new ConnectionConfiguration(); + ImportJobConfiguration jobConf = new ImportJobConfiguration(); + + Partitioner partitioner = new GenericJdbcImportPartitioner(); + PartitionerContext partitionerContext = new PartitionerContext(context, + 3); + List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); + verifyResult(partitions, new String[]{ + "BCOL = TRUE", + "BCOL = FALSE", + }); + } + + public void testVarcharPartition() throws Exception { + MutableContext context = new MutableMapContext(); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_COLUMNNAME, "VCCOL"); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.VARCHAR)); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_MINVALUE, "A"); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_MAXVALUE, "Z"); + + ConnectionConfiguration connConf = new ConnectionConfiguration(); + ImportJobConfiguration jobConf = new ImportJobConfiguration(); + + Partitioner partitioner = new GenericJdbcImportPartitioner(); + PartitionerContext partitionerContext = new PartitionerContext(context, + 25); + List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); + + verifyResult(partitions, new String[] { + "'A' <= VCCOL AND VCCOL < 'B'", + "'B' <= VCCOL AND VCCOL < 'C'", + "'C' <= VCCOL AND VCCOL < 'D'", + "'D' <= VCCOL AND VCCOL < 'E'", + "'E' <= VCCOL AND VCCOL < 'F'", + "'F' <= VCCOL AND VCCOL < 'G'", + "'G' <= VCCOL AND VCCOL < 'H'", + "'H' <= VCCOL AND VCCOL < 'I'", + "'I' <= VCCOL AND VCCOL < 'J'", + "'J' <= VCCOL AND VCCOL < 'K'", + "'K' <= VCCOL AND VCCOL < 'L'", + "'L' <= VCCOL AND VCCOL < 'M'", + "'M' <= VCCOL AND VCCOL < 'N'", + "'N' <= VCCOL AND VCCOL < 'O'", + "'O' <= VCCOL AND VCCOL < 'P'", + "'P' <= VCCOL AND VCCOL < 'Q'", + "'Q' <= VCCOL AND VCCOL < 'R'", + "'R' <= VCCOL AND VCCOL < 'S'", + "'S' <= VCCOL AND VCCOL < 'T'", + "'T' <= VCCOL AND VCCOL < 'U'", + "'U' <= VCCOL AND VCCOL < 'V'", + "'V' <= VCCOL AND VCCOL < 'W'", + "'W' <= VCCOL AND VCCOL < 'X'", + "'X' <= VCCOL AND VCCOL < 'Y'", + "'Y' <= VCCOL AND VCCOL <= 'Z'", + }); + } + + public void testVarcharPartitionWithCommonPrefix() throws Exception { + MutableContext context = new MutableMapContext(); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_COLUMNNAME, "VCCOL"); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.VARCHAR)); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_MINVALUE, "AAA"); + context.setString(GenericJdbcConnectorConstants + .CONNECTOR_JDBC_PARTITION_MAXVALUE, "AAF"); + + ConnectionConfiguration connConf = new ConnectionConfiguration(); + ImportJobConfiguration jobConf = new ImportJobConfiguration(); + + Partitioner partitioner = new GenericJdbcImportPartitioner(); + PartitionerContext partitionerContext = new PartitionerContext(context, + 5); + + List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf); + + verifyResult(partitions, new String[] { + "'AAA' <= VCCOL AND VCCOL < 'AAB'", + "'AAB' <= VCCOL AND VCCOL < 'AAC'", + "'AAC' <= VCCOL AND VCCOL < 'AAD'", + "'AAD' <= VCCOL AND VCCOL < 'AAE'", + "'AAE' <= VCCOL AND VCCOL <= 'AAF'", + }); + + } + private void verifyResult(List<Partition> partitions, String[] expected) { assertEquals(expected.length, partitions.size());
