Repository: sqoop Updated Branches: refs/heads/trunk fee80ac01 -> 5fc7a680f
SQOOP-1369: Avro export ignores --columns option (Paul Mazak via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/5fc7a680 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/5fc7a680 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/5fc7a680 Branch: refs/heads/trunk Commit: 5fc7a680f854c4a03e9a581079172d453d1a059c Parents: fee80ac Author: Jarek Jarcec Cecho <[email protected]> Authored: Tue Sep 15 07:58:53 2015 -0700 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Tue Sep 15 07:58:53 2015 -0700 ---------------------------------------------------------------------- src/java/org/apache/sqoop/SqoopOptions.java | 11 ++ .../sqoop/manager/CatalogQueryManager.java | 2 +- .../org/apache/sqoop/manager/OracleManager.java | 2 +- .../org/apache/sqoop/manager/SqlManager.java | 21 ++- .../apache/sqoop/mapreduce/JdbcExportJob.java | 12 +- src/test/com/cloudera/sqoop/TestAvroExport.java | 22 +++ src/test/org/apache/sqoop/TestSqoopOptions.java | 10 ++ .../apache/sqoop/manager/TestSqlManager.java | 78 +++++++++++ .../sqoop/mapreduce/TestJdbcExportJob.java | 137 +++++++++++++++++++ 9 files changed, 288 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/5fc7a680/src/java/org/apache/sqoop/SqoopOptions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/SqoopOptions.java b/src/java/org/apache/sqoop/SqoopOptions.java index ace90fd..db92b30 100644 --- a/src/java/org/apache/sqoop/SqoopOptions.java +++ b/src/java/org/apache/sqoop/SqoopOptions.java @@ -1175,6 +1175,17 @@ public class SqoopOptions implements Cloneable { } } + public String getColumnNameCaseInsensitive(String col){ + if (null != columns) { + for(String columnName : columns) { + if(columnName.equalsIgnoreCase(col)) { + return columnName; + } + } + } + return null; + } + public void setColumns(String [] cols) { if (null == cols) { this.columns = null; http://git-wip-us.apache.org/repos/asf/sqoop/blob/5fc7a680/src/java/org/apache/sqoop/manager/CatalogQueryManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/manager/CatalogQueryManager.java b/src/java/org/apache/sqoop/manager/CatalogQueryManager.java index 4e063ed..7b2ee78 100644 --- a/src/java/org/apache/sqoop/manager/CatalogQueryManager.java +++ b/src/java/org/apache/sqoop/manager/CatalogQueryManager.java @@ -180,7 +180,7 @@ public abstract class CatalogQueryManager } } - return columns.toArray(new String[columns.size()]); + return filterSpecifiedColumnNames(columns.toArray(new String[columns.size()])); } protected abstract String getPrimaryKeyQuery(String tableName); http://git-wip-us.apache.org/repos/asf/sqoop/blob/5fc7a680/src/java/org/apache/sqoop/manager/OracleManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/manager/OracleManager.java b/src/java/org/apache/sqoop/manager/OracleManager.java index 69b613f..d088265 100644 --- a/src/java/org/apache/sqoop/manager/OracleManager.java +++ b/src/java/org/apache/sqoop/manager/OracleManager.java @@ -980,7 +980,7 @@ public class OracleManager } } - return columns.toArray(new String[columns.size()]); + return filterSpecifiedColumnNames(columns.toArray(new String[columns.size()])); } @Override http://git-wip-us.apache.org/repos/asf/sqoop/blob/5fc7a680/src/java/org/apache/sqoop/manager/SqlManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/manager/SqlManager.java b/src/java/org/apache/sqoop/manager/SqlManager.java index ead581d..768507b 100644 --- a/src/java/org/apache/sqoop/manager/SqlManager.java +++ b/src/java/org/apache/sqoop/manager/SqlManager.java @@ -113,7 +113,26 @@ public abstract class SqlManager /** {@inheritDoc} */ public String[] getColumnNames(String tableName) { String stmt = getColNamesQuery(tableName); - return getColumnNamesForRawQuery(stmt); + return filterSpecifiedColumnNames(getColumnNamesForRawQuery(stmt)); + } + + /** + * Utilize the --columns option, if specified. + * @param columns + * @return the subset of columns which were specified by --columns option. + */ + protected String[] filterSpecifiedColumnNames(String[] columns) { + if (options.getColumns() == null) { + return columns; + } + List<String> colNames = new ArrayList<String>(); + for (String col : columns) { + String userColName = options.getColumnNameCaseInsensitive(col); + if (userColName != null) { + colNames.add(userColName); + } + } + return colNames.toArray(new String[colNames.size()]); } @Override http://git-wip-us.apache.org/repos/asf/sqoop/blob/5fc7a680/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java b/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java index 93d438a..78df33c 100644 --- a/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java +++ b/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java @@ -97,12 +97,16 @@ public class JdbcExportJob extends ExportJobBase { columnTypeInts = connManager.getColumnTypesForProcedure( options.getCall()); } + String[] specifiedColumns = options.getColumns(); MapWritable columnTypes = new MapWritable(); for (Map.Entry<String, Integer> e : columnTypeInts.entrySet()) { - Text columnName = new Text(e.getKey()); - Text columnText = new Text( - connManager.toJavaType(tableName, e.getKey(), e.getValue())); - columnTypes.put(columnName, columnText); + String column = e.getKey(); + column = (specifiedColumns == null) ? column : options.getColumnNameCaseInsensitive(column); + if (column != null) { + Text columnName = new Text(column); + Text columnType = new Text(connManager.toJavaType(tableName, column, e.getValue())); + columnTypes.put(columnName, columnType); + } } DefaultStringifier.store(job.getConfiguration(), columnTypes, AvroExportMapper.AVRO_COLUMN_TYPES_MAP); http://git-wip-us.apache.org/repos/asf/sqoop/blob/5fc7a680/src/test/com/cloudera/sqoop/TestAvroExport.java ---------------------------------------------------------------------- diff --git a/src/test/com/cloudera/sqoop/TestAvroExport.java b/src/test/com/cloudera/sqoop/TestAvroExport.java index 663828c..1d8f5df 100644 --- a/src/test/com/cloudera/sqoop/TestAvroExport.java +++ b/src/test/com/cloudera/sqoop/TestAvroExport.java @@ -404,4 +404,26 @@ public class TestAvroExport extends ExportJobTestCase { } } + public void testSpecifiedColumnsAsAvroFields() throws IOException, SQLException { + final int TOTAL_RECORDS = 10; + ColumnGenerator[] gens = new ColumnGenerator[] { + colGenerator(000, Schema.create(Schema.Type.INT), 100, "INTEGER"), //col0 + colGenerator(111, Schema.create(Schema.Type.INT), 100, "INTEGER"), //col1 + colGenerator(222, Schema.create(Schema.Type.INT), 100, "INTEGER"), //col2 + colGenerator(333, Schema.create(Schema.Type.INT), 100, "INTEGER") //col3 + }; + createAvroFile(0, TOTAL_RECORDS, gens); + createTable(gens); + runExport(getArgv(true, 10, 10, newStrArray(null, "-m", "" + 1, "--columns", "id,msg,col1,col2"))); + verifyExport(TOTAL_RECORDS); + assertColValForRowId(0, "col0", null); + assertColValForRowId(0, "col1", 111); + assertColValForRowId(0, "col2", 222); + assertColValForRowId(0, "col3", null); + assertColValForRowId(9, "col0", null); + assertColValForRowId(9, "col1", 111); + assertColValForRowId(9, "col2", 222); + assertColValForRowId(9, "col3", null); + } + } http://git-wip-us.apache.org/repos/asf/sqoop/blob/5fc7a680/src/test/org/apache/sqoop/TestSqoopOptions.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/TestSqoopOptions.java b/src/test/org/apache/sqoop/TestSqoopOptions.java index 87b340a..fdb8c8d 100644 --- a/src/test/org/apache/sqoop/TestSqoopOptions.java +++ b/src/test/org/apache/sqoop/TestSqoopOptions.java @@ -34,4 +34,14 @@ public class TestSqoopOptions extends TestCase { } }.testParseColumnMapping(); } + + public void testColumnNameCaseInsensitive() { + SqoopOptions opts = new SqoopOptions(); + opts.setColumns(new String[]{ "AAA", "bbb" }); + assertEquals("AAA", opts.getColumnNameCaseInsensitive("aAa")); + assertEquals("bbb", opts.getColumnNameCaseInsensitive("BbB")); + assertEquals(null, opts.getColumnNameCaseInsensitive("notFound")); + opts.setColumns(null); + assertEquals(null, opts.getColumnNameCaseInsensitive("noColumns")); + } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/5fc7a680/src/test/org/apache/sqoop/manager/TestSqlManager.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/manager/TestSqlManager.java b/src/test/org/apache/sqoop/manager/TestSqlManager.java new file mode 100644 index 0000000..08413b0 --- /dev/null +++ b/src/test/org/apache/sqoop/manager/TestSqlManager.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.manager; + +import static org.junit.Assert.assertArrayEquals; + +import java.sql.Connection; +import java.sql.SQLException; + +import org.junit.Test; + +import com.cloudera.sqoop.SqoopOptions; + +import junit.framework.TestCase; + +/** + * Test methods of the generic SqlManager implementation. + */ +public class TestSqlManager extends TestCase { + + @Test + public void testFilteringSpecifiedColumnNamesWhenNoneSpecified() { + SqoopOptions opts = new SqoopOptions(); + SqlManager sqlManager = stubSqlManager(opts); + String[] allColumnsFromDbTable = { "aaa", "bbb", "ccc", "ddd" }; + assertArrayEquals(new String[]{ "aaa", "bbb", "ccc", "ddd" }, sqlManager.filterSpecifiedColumnNames(allColumnsFromDbTable)); + } + + @Test + public void testFilteringSpecifiedColumnNamesWhenSubset() { + SqoopOptions opts = new SqoopOptions(); + String[] cols = { "bbb", "ccc" }; + opts.setColumns(cols); + SqlManager sqlManager = stubSqlManager(opts); + String[] allColumnsFromDbTable = { "aaa", "bbb", "ccc", "ddd" }; + assertArrayEquals(new String[]{ "bbb", "ccc" }, sqlManager.filterSpecifiedColumnNames(allColumnsFromDbTable)); + } + + @Test + public void testFilteringSpecifiedColumnNamesUsesCaseFromArgumentNotDatabase() { + SqoopOptions opts = new SqoopOptions(); + String[] cols = { "bbb", "ccc" }; + opts.setColumns(cols); + SqlManager sqlManager = stubSqlManager(opts); + String[] allColumnsFromDbTable = { "AAA", "BBB", "CCC", "DDD" }; + assertArrayEquals(new String[]{ "bbb", "ccc" }, sqlManager.filterSpecifiedColumnNames(allColumnsFromDbTable)); + } + + private SqlManager stubSqlManager(SqoopOptions opts) { + SqlManager sqlManager = new SqlManager(opts) { + @Override + public Connection getConnection() throws SQLException { + return null; + } + @Override + public String getDriverClass() { + return null; + } + }; + return sqlManager; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sqoop/blob/5fc7a680/src/test/org/apache/sqoop/mapreduce/TestJdbcExportJob.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/mapreduce/TestJdbcExportJob.java b/src/test/org/apache/sqoop/mapreduce/TestJdbcExportJob.java new file mode 100644 index 0000000..19440ff --- /dev/null +++ b/src/test/org/apache/sqoop/mapreduce/TestJdbcExportJob.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.mapreduce; + +import java.io.IOException; +import java.sql.Types; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.apache.hadoop.io.DefaultStringifier; +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.sqoop.mapreduce.ExportJobBase.FileType; +import org.junit.Test; + +import com.cloudera.sqoop.SqoopOptions; +import com.cloudera.sqoop.manager.ConnManager; +import com.cloudera.sqoop.manager.ExportJobContext; + +import junit.framework.TestCase; + +/** + * Test methods of the JdbcExportJob implementation. + */ +public class TestJdbcExportJob extends TestCase { + + @Test + public void testAvroWithNoColumnsSpecified() throws Exception { + SqoopOptions opts = new SqoopOptions(); + opts.setExportDir("myexportdir"); + JdbcExportJob jdbcExportJob = stubJdbcExportJob(opts, FileType.AVRO_DATA_FILE); + Job job = new Job(); + jdbcExportJob.configureInputFormat(job, null, null, null); + assertEquals(asSetOfText("Age", "Name", "Gender"), DefaultStringifier.load(job.getConfiguration(), AvroExportMapper.AVRO_COLUMN_TYPES_MAP, MapWritable.class).keySet()); + } + + @Test + public void testAvroWithAllColumnsSpecified() throws Exception { + SqoopOptions opts = new SqoopOptions(); + opts.setExportDir("myexportdir"); + String[] columns = { "Age", "Name", "Gender" }; + opts.setColumns(columns); + JdbcExportJob jdbcExportJob = stubJdbcExportJob(opts, FileType.AVRO_DATA_FILE); + Job job = new Job(); + jdbcExportJob.configureInputFormat(job, null, null, null); + assertEquals(asSetOfText("Age", "Name", "Gender"), DefaultStringifier.load(job.getConfiguration(), AvroExportMapper.AVRO_COLUMN_TYPES_MAP, MapWritable.class).keySet()); + } + + @Test + public void testAvroWithOneColumnSpecified() throws Exception { + SqoopOptions opts = new SqoopOptions(); + opts.setExportDir("myexportdir"); + String[] columns = { "Gender" }; + opts.setColumns(columns); + JdbcExportJob jdbcExportJob = stubJdbcExportJob(opts, FileType.AVRO_DATA_FILE); + Job job = new Job(); + jdbcExportJob.configureInputFormat(job, null, null, null); + assertEquals(asSetOfText("Gender"), DefaultStringifier.load(job.getConfiguration(), AvroExportMapper.AVRO_COLUMN_TYPES_MAP, MapWritable.class).keySet()); + } + + @Test + public void testAvroWithSomeColumnsSpecified() throws Exception { + SqoopOptions opts = new SqoopOptions(); + opts.setExportDir("myexportdir"); + String[] columns = { "Age", "Name" }; + opts.setColumns(columns); + JdbcExportJob jdbcExportJob = stubJdbcExportJob(opts, FileType.AVRO_DATA_FILE); + Job job = new Job(); + jdbcExportJob.configureInputFormat(job, null, null, null); + assertEquals(asSetOfText("Age", "Name"), DefaultStringifier.load(job.getConfiguration(), AvroExportMapper.AVRO_COLUMN_TYPES_MAP, MapWritable.class).keySet()); + } + + @Test + public void testAvroWithMoreColumnsSpecified() throws Exception { + SqoopOptions opts = new SqoopOptions(); + opts.setExportDir("myexportdir"); + String[] columns = { "Age", "Name", "Gender", "Address" }; + opts.setColumns(columns); + JdbcExportJob jdbcExportJob = stubJdbcExportJob(opts, FileType.AVRO_DATA_FILE); + Job job = new Job(); + jdbcExportJob.configureInputFormat(job, null, null, null); + assertEquals(asSetOfText("Age", "Name", "Gender"), DefaultStringifier.load(job.getConfiguration(), AvroExportMapper.AVRO_COLUMN_TYPES_MAP, MapWritable.class).keySet()); + } + + private JdbcExportJob stubJdbcExportJob(SqoopOptions opts, final FileType inputFileType) throws IOException { + ExportJobContext mockContext = mock(ExportJobContext.class); + when(mockContext.getOptions()).thenReturn(opts); + ConnManager mockConnManager = mock(ConnManager.class); + Map<String, Integer> columnTypeInts = new HashMap<String, Integer>(); + columnTypeInts.put("Name", Types.VARCHAR); + columnTypeInts.put("Age", Types.SMALLINT); + columnTypeInts.put("Gender", Types.CHAR); + when(mockConnManager.getColumnTypes(anyString(), anyString())).thenReturn(columnTypeInts); + when(mockConnManager.toJavaType(anyString(), anyString(), anyInt())).thenReturn("String"); + when(mockContext.getConnManager()).thenReturn(mockConnManager); + JdbcExportJob jdbcExportJob = new JdbcExportJob(mockContext) { + @Override + protected FileType getInputFileType() { + return inputFileType; + } + }; + jdbcExportJob.options = opts; + return jdbcExportJob; + } + + private Set<Text> asSetOfText(String... strings) { + Set<Text> setOfText = new HashSet<Text>(); + for (String string : strings) { + setOfText.add(new Text(string)); + } + return setOfText; + } +} \ No newline at end of file
