http://git-wip-us.apache.org/repos/asf/sqoop/blob/6984a36c/src/test/com/cloudera/sqoop/tool/TestToolPlugin.java ---------------------------------------------------------------------- diff --git a/src/test/com/cloudera/sqoop/tool/TestToolPlugin.java b/src/test/com/cloudera/sqoop/tool/TestToolPlugin.java deleted file mode 100644 index da1ef65..0000000 --- a/src/test/com/cloudera/sqoop/tool/TestToolPlugin.java +++ /dev/null @@ -1,153 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.cloudera.sqoop.tool; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -import org.apache.commons.cli.CommandLine; - -import org.apache.hadoop.conf.Configuration; - -import org.apache.hadoop.util.StringUtils; - -import com.cloudera.sqoop.Sqoop; -import com.cloudera.sqoop.SqoopOptions; - -import com.cloudera.sqoop.SqoopOptions.InvalidOptionsException; - -import com.cloudera.sqoop.cli.ToolOptions; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import org.junit.Test; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -/** - * Test that tool plugins work. - */ -public class TestToolPlugin { - - public static final Log LOG = LogFactory - .getLog(TestToolPlugin.class.getName()); - - /** - * The plugin that registers the tool. - */ - public static class PluginClass extends ToolPlugin { - public List<ToolDesc> getTools() { - return Collections.singletonList(new ToolDesc("fooTool", - FooTool.class, "does foo things")); - } - } - - /** - * The dummy tool itself. - */ - public static class FooTool extends BaseSqoopTool { - /** Holds the name of the last user we "operated" as. */ - private static String lastUser; - static String getLastUser() { - return lastUser; - } - - private static void setLastUser(String last) { - lastUser = last; - } - - public FooTool() { - super("fooTool"); - } - - /** Just save the username and call it a day. */ - @Override - public int run(SqoopOptions opts) { - setLastUser(opts.getUsername()); - LOG.info("FooTool operating on user: " + lastUser); - return 0; - } - - @Override - public void configureOptions(ToolOptions toolOptions) { - toolOptions.addUniqueOptions(getCommonOptions()); - } - - @Override - public void applyOptions(CommandLine in, SqoopOptions out) - throws InvalidOptionsException { - applyCommonOptions(in, out); - } - - @Override - public void validateOptions(SqoopOptions options) - throws InvalidOptionsException { - validateCommonOptions(options); - } - } - - @Test - public void testPlugin() { - // Register the plugin with SqoopTool. - Configuration pluginConf = new Configuration(); - pluginConf.set(SqoopTool.TOOL_PLUGINS_KEY, PluginClass.class.getName()); - SqoopTool.loadPlugins(pluginConf); - - ArrayList<String> args = new ArrayList<String>(); - args.add("fooTool"); - args.add("--username"); - args.add("bob"); - args.add("--connect"); - args.add("anywhere"); - - int ret = Sqoop.runTool(args.toArray(new String[0])); - assertEquals("Expected tool run success", 0, ret); - - String actualUser = FooTool.getLastUser(); - assertEquals("Failed to set username correctly.", "bob", actualUser); - } - - /** - * Plugin class that tries to override an existing tool definition. - */ - public static class OverridePlugin extends ToolPlugin { - public List<ToolDesc> getTools() { - return Collections.singletonList(new ToolDesc("import", - FooTool.class, "replaces 'import' with foo")); - } - } - - @Test - public void testNoOverrideTools() { - // Test that you can't override an existing tool definition. First - // registration of a tool name wins. - Configuration pluginConf = new Configuration(); - pluginConf.set(SqoopTool.TOOL_PLUGINS_KEY, OverridePlugin.class.getName()); - try { - SqoopTool.loadPlugins(pluginConf); - fail("Successfully loaded a plugin that overrides 'import' tool."); - } catch (RuntimeException re) { - LOG.info("Got runtime exception registering plugin (expected; ok): " - + StringUtils.stringifyException(re)); - } - } -}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/6984a36c/src/test/com/cloudera/sqoop/util/TestOptionsFileExpansion.java ---------------------------------------------------------------------- diff --git a/src/test/com/cloudera/sqoop/util/TestOptionsFileExpansion.java b/src/test/com/cloudera/sqoop/util/TestOptionsFileExpansion.java deleted file mode 100644 index d403f3b..0000000 --- a/src/test/com/cloudera/sqoop/util/TestOptionsFileExpansion.java +++ /dev/null @@ -1,436 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.cloudera.sqoop.util; - -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileWriter; -import java.io.IOException; - -import org.junit.Assert; - -import com.cloudera.sqoop.Sqoop; -import org.junit.Test; - -/** - * Tests various options file loading scenarios. - */ -public class TestOptionsFileExpansion { - - /** - * Text from options file 1. Each string represents a new line. - */ - private static final String[] OPTIONS_FILE_TEXT1 = new String[] { - "--foo", - "-bar", - "--", - "--XYZ", - }; - - /** - * Expected options parsed out from options file 1. - */ - private static final String[] OPTIONS_FILE_TEXT1_OUTPUT = new String[] { - "--foo", - "-bar", - "--", - "--XYZ", - }; - - /** - * Text for options file 2. Each string represents a new line. This - * contains empty lines, comments and optinos that extend to multiple lines. - */ - private static final String[] OPTIONS_FILE_TEXT2 = new String[] { - "--archives", - "tools.jar,archive.jar,test.jar,\\", - "ldap.jar,sasl.jar", - "--connect", - "jdbc:jdbcspy:localhost:1521:test", - "--username", - "superman", - "--password", - "", - "# Ironic password.", - "# No one will ever guess.", - "kryptonite", - }; - - /** - * Expected options parsed out from file 2. - */ - private static final String[] OPTIONS_FILE_TEXT2_OUTPUT = new String[] { - "--archives", - "tools.jar,archive.jar,test.jar,ldap.jar,sasl.jar", - "--connect", - "jdbc:jdbcspy:localhost:1521:test", - "--username", - "superman", - "--password", - "kryptonite", - }; - - /** - * Text for options file 4. This contains options that represent empty - * strings or strings that have leading and trailing spaces. - */ - private static final String[] OPTIONS_FILE_TEXT3 = new String[] { - "-", - "\" leading spaces\"", - "' leading and trailing spaces '", - "\"\"", - "''", - }; - - /** - * Expected options parsed out from file 3. - */ - private static final String[] OPTIONS_FILE_TEXT3_OUTPUT = new String[] { - "-", - " leading spaces", - " leading and trailing spaces ", - "", - "", - }; - - /** - * Text for options file 4. This file has an invalid entry in the last line - * which will cause it to fail to load. - */ - private static final String[] OPTIONS_FILE_TEXT4 = new String[] { - "--abcd", - "--efgh", - "# foo", - "# bar", - "XYZ\\", - }; - - /** - * Text for options file 5. This file has an invalid entry in the second line - * where there is a starting single quote character that is not terminating. - */ - private static final String[] OPTIONS_FILE_TEXT5 = new String[] { - "-abcd", - "\'", - "--foo", - }; - - /** - * Text for options file 6. This file has an invalid entry in the second line - * where a quoted string extends into the following line. - */ - private static final String[] OPTIONS_FILE_TEXT6 = new String[] { - "--abcd", - "' the quick brown fox \\", - "jumped over the lazy dog'", - "--efgh", - }; - - @Test - public void testOptionsFiles() throws Exception { - checkOptionsFile(OPTIONS_FILE_TEXT1, OPTIONS_FILE_TEXT1_OUTPUT); - checkOptionsFile(OPTIONS_FILE_TEXT2, OPTIONS_FILE_TEXT2_OUTPUT); - checkOptionsFile(OPTIONS_FILE_TEXT3, OPTIONS_FILE_TEXT3_OUTPUT); - } - - @Test - public void testInvalidOptionsFile() { - checkInvalidOptionsFile(OPTIONS_FILE_TEXT4); - checkInvalidOptionsFile(OPTIONS_FILE_TEXT5); - } - - @Test - public void testMultilineQuotedText() { - try { - checkOptionsFile(OPTIONS_FILE_TEXT6, new String[] {}); - Assert.assertTrue(false); - } catch (Exception ex) { - Assert.assertTrue( - ex.getMessage().startsWith("Multiline quoted strings not supported")); - } - } - - @Test - public void testValidFreeFormQueryNoQuotes() throws Exception { - String[] input = new String[]{ - "--query", - "SELECT * FROM table", - }; - - String[] output = new String[] { - "--query", - "SELECT * FROM table", - }; - - checkOptionsFile(input, output); - } - - @Test - public void testValidFreeFormQuerySingleQuotesStartAndEnd() throws Exception { - String[] input = new String[]{ - "--query", - "'SELECT * FROM table'", - }; - - String[] output = new String[]{ - "--query", - "SELECT * FROM table", - }; - - checkOptionsFile(input, output); - } - - @Test - public void testValidFreeFormQueryDoubleQuotesStartAndEnd() throws Exception { - String[] input = new String[]{ - "--query", - "\"SELECT * FROM table\"", - }; - - String[] output = new String[]{ - "--query", - "SELECT * FROM table", - }; - - checkOptionsFile(input, output); - } - - @Test - public void testValidFreeFormQuerySingleQuotesInWhere() throws Exception { - String[] input = new String[]{ - "--query", - "SELECT * FROM table WHERE a = '1'", - }; - - String[] output = new String[]{ - "--query", - "SELECT * FROM table WHERE a = '1'", - }; - - checkOptionsFile(input, output); - } - - @Test - public void testValidFreeFormQuerySingleAndDoubleQuotesInWhere() throws Exception { - String[] input = new String[] { - "--query", - "SELECT * FROM table WHERE a = '1' AND b = \"testing\"", - }; - - String[] output = new String[] { - "--query", - "SELECT * FROM table WHERE a = '1' AND b = \"testing\"", - }; - - checkOptionsFile(input, output); - } - - @Test - public void testValidFreeFormQueryQuotesInTableNameAndColumnName() throws Exception { - String[] input = new String[] { - "--query", - "select * from `test\"test` where `c'c` = 'a'", - }; - - String[] output = new String[] { - "--query", - "select * from `test\"test` where `c'c` = 'a'", - }; - - checkOptionsFile(input, output); - } - - @Test - public void testValidFreeFormQueryQuotesInTableNameAndColumnName2() throws Exception { - String[] input = new String[] { - "--query", - "select * from `test\"test` where `c'c` = 'a\"'", - }; - - String[] output = new String[] { - "--query", - "select * from `test\"test` where `c'c` = 'a\"'", - }; - - checkOptionsFile(input, output); - } - - @Test - public void testValidFreeFormQueryQuotesInTableNameAndColumnName3() throws Exception { - String[] input = new String[] { - "--query", - "select * from `test\"test` where `c'c` = \"\"", - }; - - String[] output = new String[] { - "--query", - "select * from `test\"test` where `c'c` = \"\"", - }; - - checkOptionsFile(input, output); - } - - @Test - public void testValidFreeFormQueryQuotesInTableNameAndColumnName4() throws Exception { - String[] input = new String[] { - "--query", - "select * from test where a = \"\\\"\"", - }; - - String[] output = new String[] { - "--query", - "select * from test where a = \"\\\"\"", - }; - - checkOptionsFile(input, output); - } - - @Test - public void testInvalidFreeFormQueryEndingSingleQuoteOnly() throws Exception { - String[] input = new String[]{ - "--query", - "SELECT * FROM table'", - }; - - checkInvalidOptionsFile(input); - } - - @Test - public void testInvalidFreeFormQuerySingleQuoteStartDoubleQuoteEnd() throws Exception { - - String[] input = new String[]{ - "--query", - "'SELECT * FROM table\"", - }; - - checkInvalidOptionsFile(input); - } - - private void checkInvalidOptionsFile(String[] fileContents) { - try { - checkOptionsFile(fileContents, new String[] {}); - Assert.assertTrue(false); - } catch (Exception ex) { - Assert.assertTrue(ex.getMessage().startsWith("Malformed option")); - } - } - - private void checkOptionsFile(String[] fileContent, String[] expectedOptions) - throws Exception { - String[] prefix0 = new String[] { }; - String[] suffix0 = new String[] { }; - - checkOutput(prefix0, suffix0, fileContent, expectedOptions); - - String[] prefix1 = new String[] { "--nomnom" }; - String[] suffix1 = new String[] { }; - - checkOutput(prefix1, suffix1, - fileContent, expectedOptions); - - String[] prefix2 = new String[] { }; - String[] suffix2 = new String[] { "yIkes" }; - - checkOutput(prefix2, suffix2, - fileContent, expectedOptions); - - String[] prefix3 = new String[] { "foo", "bar" }; - String[] suffix3 = new String[] { "xyz", "abc" }; - - checkOutput(prefix3, suffix3, - fileContent, expectedOptions); - } - - - /** - * Uses the given prefix and suffix to create the original args array which - * contains two entries between the prefix and suffix entries that specify - * the options file. The options file is dynamically created using the - * contents of the third array - fileContent. Once this is expanded, the - * expanded arguments are compared to see if they are same as prefix entries - * followed by parsed arguments from the options file, followed by suffix - * entries. - * @param prefix - * @param suffix - * @param fileContent - * @param expectedContent - * @throws Exception - */ - private void checkOutput(String[] prefix, String[] suffix, - String[] fileContent, String[] expectedContent) throws Exception { - - String[] args = new String[prefix.length + 2 + suffix.length]; - - for (int i = 0; i < prefix.length; i++) { - args[i] = prefix[i]; - } - args[prefix.length] = Sqoop.SQOOP_OPTIONS_FILE_SPECIFIER; - args[prefix.length + 1] = createOptionsFile(fileContent); - for (int j = 0; j < suffix.length; j++) { - args[j + 2 + prefix.length] = suffix[j]; - } - - String[] expandedArgs = OptionsFileUtil.expandArguments(args); - - assertSame(prefix, expectedContent, suffix, expandedArgs); - - } - - private void assertSame(String[] prefix, String[] content, String[] suffix, - String[] actual) { - Assert.assertTrue(prefix.length + content.length + suffix.length - == actual.length); - - for (int i = 0; i < prefix.length; i++) { - Assert.assertTrue(actual[i].equals(prefix[i])); - } - - for (int i = 0; i < content.length; i++) { - Assert.assertTrue(actual[i + prefix.length].equals(content[i])); - } - - for (int i = 0; i < suffix.length; i++) { - Assert.assertTrue(actual[i + prefix.length + content.length].equals( - suffix[i])); - } - } - - private String createOptionsFile(String[] data) throws Exception { - File file = File.createTempFile("options", ".opf"); - file.deleteOnExit(); - - BufferedWriter writer = null; - try { - writer = new BufferedWriter(new FileWriter(file)); - for (String datum : data) { - writer.write(datum); - writer.newLine(); - } - } finally { - if (writer != null) { - try { - writer.close(); - } catch (IOException ex) { - // No handling required - } - } - } - - return file.getAbsolutePath(); - } -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/6984a36c/src/test/com/cloudera/sqoop/util/TestSubstitutionUtils.java ---------------------------------------------------------------------- diff --git a/src/test/com/cloudera/sqoop/util/TestSubstitutionUtils.java b/src/test/com/cloudera/sqoop/util/TestSubstitutionUtils.java deleted file mode 100644 index b6b072f..0000000 --- a/src/test/com/cloudera/sqoop/util/TestSubstitutionUtils.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.cloudera.sqoop.util; - -import org.apache.sqoop.util.SubstitutionUtils; -import org.junit.Test; - -import static org.junit.Assert.assertEquals; - -/** - * - */ -public class TestSubstitutionUtils { - - @Test - public void testRemoveEscapeCharacters() { - assertEquals("\\N", SubstitutionUtils.removeEscapeCharacters("\\\\N")); - assertEquals("\n", SubstitutionUtils.removeEscapeCharacters("\\n")); - assertEquals("\b", SubstitutionUtils.removeEscapeCharacters("\\b")); - assertEquals("\t", SubstitutionUtils.removeEscapeCharacters("\\t")); - assertEquals("\f", SubstitutionUtils.removeEscapeCharacters("\\f")); - assertEquals("\'", SubstitutionUtils.removeEscapeCharacters("\\'")); - assertEquals("\"", SubstitutionUtils.removeEscapeCharacters("\\\"")); - assertEquals("sqoop", SubstitutionUtils.removeEscapeCharacters("sqoop")); - } -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/6984a36c/src/test/findbugsExcludeFile.xml ---------------------------------------------------------------------- diff --git a/src/test/findbugsExcludeFile.xml b/src/test/findbugsExcludeFile.xml deleted file mode 100644 index 8aa4ed5..0000000 --- a/src/test/findbugsExcludeFile.xml +++ /dev/null @@ -1,78 +0,0 @@ -<?xml version="1.0"?> - -<!-- - Copyright 2011 The Apache Software Foundation - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> - - -<!-- - This file enumerates all the findbugs warnings that we want to suppress. - If you add a spurious warning, you should add it to this file so that it - does not generate warnings in the official report. - - For each exception you add, include a comment in your <Match> block - explaining why this is not a bug. ---> -<FindBugsFilter> - <Match> - <!-- SQL db can return null for a boolean column; so can we. --> - <Class name="com.cloudera.sqoop.lib.JdbcWritableBridge" /> - <Method name="readBoolean" /> - <Bug pattern="NP_BOOLEAN_RETURN_NULL" /> - </Match> - <Match> - <!-- This mapper intentially triggers an NPE to cause an exception - which the test case much catch. --> - <Class name="com.cloudera.sqoop.mapreduce.TestImportJob$NullDereferenceMapper" /> - <Method name="map" /> - <Bug pattern="NP_ALWAYS_NULL" /> - </Match> - <Match> - <!-- createRootTable() allows a user-specified table name retrieved - from properties. This since instance is allowed for now. - --> - <Class name="com.cloudera.sqoop.metastore.GenericJobStorage" /> - <Method name="createRootTable" /> - <Bug pattern="SQL_NONCONSTANT_STRING_PASSED_TO_EXECUTE" /> - </Match> - - <!-- The following broad categories suppress warnings in test code that do - not need to be rigidly upheld. --> - <Match> - <!-- Performance warnings are ignored in test code. --> - <Class name="~com\.cloudera\.sqoop\..*Test.*" /> - <Bug category="PERFORMANCE" /> - </Match> - <Match> - <!-- More performance warnings to suppress in tests. --> - <Class name="~com\.cloudera\.sqoop\..*Test.*" /> - <Bug pattern="SBSC_USE_STRINGBUFFER_CONCATENATION" /> - </Match> - <Match> - <!-- Security warnings are ignored in test code. --> - <Class name="~com\.cloudera\.sqoop\..*Test.*" /> - <Bug category="SECURITY" /> - </Match> - <Match> - <!-- Ok to use methods to generate SQL statements in tests. --> - <Class name="~com\.cloudera\.sqoop\..*Test.*" /> - <Bug pattern="SQL_PREPARED_STATEMENT_GENERATED_FROM_NONCONSTANT_STRING" /> - </Match> - -</FindBugsFilter> http://git-wip-us.apache.org/repos/asf/sqoop/blob/6984a36c/src/test/org/apache/sqoop/TestAllTables.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/TestAllTables.java b/src/test/org/apache/sqoop/TestAllTables.java new file mode 100644 index 0000000..56d1f57 --- /dev/null +++ b/src/test/org/apache/sqoop/TestAllTables.java @@ -0,0 +1,252 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop; + +import java.io.*; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; +import org.junit.Before; +import org.junit.After; + +import org.apache.sqoop.testutil.CommonArgs; +import org.apache.sqoop.testutil.ImportJobTestCase; +import org.apache.sqoop.tool.ImportAllTablesTool; +import org.junit.Test; +import org.kitesdk.data.Dataset; +import org.kitesdk.data.DatasetReader; +import org.kitesdk.data.Datasets; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.fail; + +/** + * Test the --all-tables functionality that can import multiple tables. + */ +public class TestAllTables extends ImportJobTestCase { + + /** + * Create the argv to pass to Sqoop. + * @return the argv as an array of strings. + */ + private String [] getArgv(String[] extraArgs, String[] excludeTables) { + ArrayList<String> args = new ArrayList<String>(); + + CommonArgs.addHadoopFlags(args); + args.add("--warehouse-dir"); + args.add(getWarehouseDir()); + args.add("--connect"); + args.add(getConnectString()); + args.add("--num-mappers"); + args.add("1"); + args.add("--escaped-by"); + args.add("\\"); + if (excludeTables != null) { + args.add("--exclude-tables"); + args.add(StringUtils.join(excludeTables, ",")); + } + if (extraArgs != null) { + for (String arg : extraArgs) { + args.add(arg); + } + } + + return args.toArray(new String[0]); + } + + /** the names of the tables we're creating. */ + private List<String> tableNames; + + /** The strings to inject in the (ordered) tables. */ + private List<String> expectedStrings; + + @Before + public void setUp() { + // start the server + super.setUp(); + + if (useHsqldbTestServer()) { + // throw away TWOINTTABLE and things we don't care about. + try { + this.getTestServer().dropExistingSchema(); + } catch (SQLException sqlE) { + fail(sqlE.toString()); + } + } + + this.tableNames = new ArrayList<String>(); + this.expectedStrings = new ArrayList<String>(); + + // create two tables. + this.expectedStrings.add("A winner"); + this.expectedStrings.add("is you!"); + this.expectedStrings.add(null); + + int i = 0; + for (String expectedStr: this.expectedStrings) { + String wrappedStr = null; + if (expectedStr != null) { + wrappedStr = "'" + expectedStr + "'"; + } + + String [] types = { "INT NOT NULL PRIMARY KEY", "VARCHAR(32)" }; + String [] vals = { Integer.toString(i++) , wrappedStr }; + this.createTableWithColTypes(types, vals); + this.tableNames.add(this.getTableName()); + this.removeTableDir(); + incrementTableNum(); + } + } + + @After + public void tearDown() { + try { + for (String table : tableNames) { + dropTableIfExists(table); + } + } catch(SQLException e) { + LOG.error("Can't clean up the database:", e); + } + super.tearDown(); + } + + @Test + public void testMultiTableImport() throws IOException { + String [] argv = getArgv(null, null); + runImport(new ImportAllTablesTool(), argv); + + Path warehousePath = new Path(this.getWarehouseDir()); + int i = 0; + for (String tableName : this.tableNames) { + Path tablePath = new Path(warehousePath, tableName); + Path filePath = new Path(tablePath, "part-m-00000"); + + // dequeue the expected value for this table. This + // list has the same order as the tableNames list. + String expectedVal = Integer.toString(i++) + "," + + this.expectedStrings.get(0); + this.expectedStrings.remove(0); + + BufferedReader reader = null; + if (!isOnPhysicalCluster()) { + reader = new BufferedReader( + new InputStreamReader(new FileInputStream( + new File(filePath.toString())))); + } else { + FileSystem dfs = FileSystem.get(getConf()); + FSDataInputStream dis = dfs.open(filePath); + reader = new BufferedReader(new InputStreamReader(dis)); + } + try { + String line = reader.readLine(); + assertEquals("Table " + tableName + " expected a different string", + expectedVal, line); + } finally { + IOUtils.closeStream(reader); + } + } + } + + @Test + public void testMultiTableImportAsParquetFormat() throws IOException { + String [] argv = getArgv(new String[]{"--as-parquetfile"}, null); + runImport(new ImportAllTablesTool(), argv); + + Path warehousePath = new Path(this.getWarehouseDir()); + int i = 0; + for (String tableName : this.tableNames) { + Path tablePath = new Path(warehousePath, tableName); + Dataset dataset = Datasets.load("dataset:file:" + tablePath); + + // dequeue the expected value for this table. This + // list has the same order as the tableNames list. + String expectedVal = Integer.toString(i++) + "," + + this.expectedStrings.get(0); + this.expectedStrings.remove(0); + + DatasetReader<GenericRecord> reader = dataset.newReader(); + try { + GenericRecord record = reader.next(); + String line = record.get(0) + "," + record.get(1); + assertEquals("Table " + tableName + " expected a different string", + expectedVal, line); + assertFalse(reader.hasNext()); + } finally { + reader.close(); + } + } + } + + @Test + public void testMultiTableImportWithExclude() throws IOException { + String exclude = this.tableNames.get(0); + String [] argv = getArgv(null, new String[]{ exclude }); + runImport(new ImportAllTablesTool(), argv); + + Path warehousePath = new Path(this.getWarehouseDir()); + int i = 0; + for (String tableName : this.tableNames) { + Path tablePath = new Path(warehousePath, tableName); + Path filePath = new Path(tablePath, "part-m-00000"); + + // dequeue the expected value for this table. This + // list has the same order as the tableNames list. + String expectedVal = Integer.toString(i++) + "," + + this.expectedStrings.get(0); + this.expectedStrings.remove(0); + + BufferedReader reader = null; + if (!isOnPhysicalCluster()) { + reader = new BufferedReader( + new InputStreamReader(new FileInputStream( + new File(filePath.toString())))); + } else { + FSDataInputStream dis; + FileSystem dfs = FileSystem.get(getConf()); + if (tableName.equals(exclude)) { + try { + dis = dfs.open(filePath); + assertFalse(true); + } catch (FileNotFoundException e) { + // Success + continue; + } + } else { + dis = dfs.open(filePath); + } + reader = new BufferedReader(new InputStreamReader(dis)); + } + try { + String line = reader.readLine(); + assertEquals("Table " + tableName + " expected a different string", + expectedVal, line); + } finally { + IOUtils.closeStream(reader); + } + } + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/6984a36c/src/test/org/apache/sqoop/TestAppendUtils.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/TestAppendUtils.java b/src/test/org/apache/sqoop/TestAppendUtils.java new file mode 100644 index 0000000..f14fc6a --- /dev/null +++ b/src/test/org/apache/sqoop/TestAppendUtils.java @@ -0,0 +1,315 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.StringUtils; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.sqoop.manager.ImportJobContext; + +import org.apache.sqoop.testutil.CommonArgs; +import org.apache.sqoop.testutil.HsqldbTestServer; +import org.apache.sqoop.testutil.ImportJobTestCase; +import org.apache.sqoop.tool.ImportTool; +import org.apache.sqoop.util.AppendUtils; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Test that --append works. + */ +public class TestAppendUtils extends ImportJobTestCase { + + private static final int PARTITION_DIGITS = 5; + private static final String FILEPART_SEPARATOR = "-"; + + public static final Log LOG = LogFactory.getLog(TestAppendUtils.class + .getName()); + + /** + * Create the argv to pass to Sqoop. + * + * @return the argv as an array of strings. + */ + protected ArrayList getOutputlessArgv(boolean includeHadoopFlags, boolean queryBased, + String[] colNames, Configuration conf) { + if (null == colNames) { + colNames = getColNames(); + } + + String splitByCol = colNames[0]; + String columnsString = ""; + for (String col : colNames) { + columnsString += col + ","; + } + + ArrayList<String> args = new ArrayList<String>(); + + if (includeHadoopFlags) { + CommonArgs.addHadoopFlags(args); + } + + if(queryBased) { + args.add("--query"); + args.add("SELECT * FROM " + getTableName() + " WHERE $CONDITIONS"); + } else { + args.add("--table"); + args.add(getTableName()); + } + args.add("--columns"); + args.add(columnsString); + args.add("--split-by"); + args.add(splitByCol); + args.add("--connect"); + args.add(getConnectString()); + args.add("--as-sequencefile"); + args.add("--num-mappers"); + args.add("1"); + + args.addAll(getExtraArgs(conf)); + + return args; + } + + // this test just uses the two int table. + protected String getTableName() { + return HsqldbTestServer.getTableName(); + } + + /** the same than ImportJobTestCase but without removing tabledir. */ + protected void runUncleanImport(String[] argv) throws IOException { + // run the tool through the normal entry-point. + int ret; + try { + Configuration conf = getConf(); + SqoopOptions opts = getSqoopOptions(conf); + Sqoop sqoop = new Sqoop(new ImportTool(), conf, opts); + ret = Sqoop.runSqoop(sqoop, argv); + } catch (Exception e) { + LOG.error("Got exception running Sqoop: " + e.toString()); + e.printStackTrace(); + ret = 1; + } + + // expect a successful return. + if (0 != ret) { + throw new IOException("Failure during job; return status " + ret); + } + } + + /** @return FileStatus for data files only. */ + private FileStatus[] listFiles(FileSystem fs, Path path) throws IOException { + FileStatus[] fileStatuses = fs.listStatus(path); + ArrayList files = new ArrayList(); + Pattern patt = Pattern.compile("part.*-([0-9][0-9][0-9][0-9][0-9]).*"); + for (FileStatus fstat : fileStatuses) { + String fname = fstat.getPath().getName(); + if (!fstat.isDir()) { + Matcher mat = patt.matcher(fname); + if (mat.matches()) { + files.add(fstat); + } + } + } + return (FileStatus[]) files.toArray(new FileStatus[files.size()]); + } + + private class StatusPathComparator implements Comparator<FileStatus> { + + @Override + public int compare(FileStatus fs1, FileStatus fs2) { + return fs1.getPath().toString().compareTo(fs2.getPath().toString()); + } + } + + /** @return a concat. string with file-creation dates excluding folders. */ + private String getFileCreationTimeImage(FileSystem fs, Path outputPath, + int fileCount) throws IOException { + // create string image with all file creation dates + StringBuffer image = new StringBuffer(); + FileStatus[] fileStatuses = listFiles(fs, outputPath); + // sort the file statuses by path so we have a stable order for + // using 'fileCount'. + Arrays.sort(fileStatuses, new StatusPathComparator()); + for (int i = 0; i < fileStatuses.length && i < fileCount; i++) { + image.append(fileStatuses[i].getPath() + "=" + + fileStatuses[i].getModificationTime()); + } + return image.toString(); + } + + /** @return the number part of a partition */ + private int getFilePartition(Path file) { + String filename = file.getName(); + int pos = filename.lastIndexOf(FILEPART_SEPARATOR); + if (pos != -1) { + String part = filename.substring(pos + 1, pos + 1 + PARTITION_DIGITS); + return Integer.parseInt(part); + } else { + return 0; + } + } + + /** + * Test for ouput path file-count increase, current files untouched and new + * correct partition number. + * + * @throws IOException + */ + public void runAppendTest(ArrayList args, Path outputPath) + throws IOException { + + try { + + // ensure non-existing output dir for insert phase + FileSystem fs = FileSystem.get(getConf()); + if (fs.exists(outputPath)) { + fs.delete(outputPath, true); + } + + // run Sqoop in INSERT mode + String[] argv = (String[]) args.toArray(new String[0]); + runUncleanImport(argv); + + // get current file count + FileStatus[] fileStatuses = listFiles(fs, outputPath); + Arrays.sort(fileStatuses, new StatusPathComparator()); + int previousFileCount = fileStatuses.length; + + // get string image with all file creation dates + String previousImage = getFileCreationTimeImage(fs, outputPath, + previousFileCount); + + // get current last partition number + Path lastFile = fileStatuses[fileStatuses.length - 1].getPath(); + int lastPartition = getFilePartition(lastFile); + + // run Sqoop in APPEND mode + args.add("--append"); + argv = (String[]) args.toArray(new String[0]); + runUncleanImport(argv); + + // check directory file increase + fileStatuses = listFiles(fs, outputPath); + Arrays.sort(fileStatuses, new StatusPathComparator()); + int currentFileCount = fileStatuses.length; + assertTrue("Output directory didn't got increased in file count ", + currentFileCount > previousFileCount); + + // check previous files weren't modified, also works for partition + // overlapping + String currentImage = getFileCreationTimeImage(fs, outputPath, + previousFileCount); + assertEquals("Previous files to appending operation were modified", + currentImage, previousImage); + + // check that exists at least 1 new correlative partition + // let's use a different way than the code being tested + Path newFile = fileStatuses[previousFileCount].getPath(); // there is a + // new bound now + int newPartition = getFilePartition(newFile); + assertTrue("New partition file isn't correlative", + lastPartition + 1 == newPartition); + + } catch (Exception e) { + LOG.error("Got Exception: " + StringUtils.stringifyException(e)); + fail(e.toString()); + } + } + + /** independent to target-dir. */ + @Test + public void testAppend() throws IOException { + ArrayList args = getOutputlessArgv(false, false, HsqldbTestServer.getFieldNames(), getConf()); + args.add("--warehouse-dir"); + args.add(getWarehouseDir()); + + Path output = new Path(getWarehouseDir(), HsqldbTestServer.getTableName()); + runAppendTest(args, output); + } + + /** working with target-dir. */ + @Test + public void testAppendToTargetDir() throws IOException { + ArrayList args = getOutputlessArgv(false, false, HsqldbTestServer.getFieldNames(), getConf()); + String targetDir = getWarehouseDir() + "/tempTargetDir"; + args.add("--target-dir"); + args.add(targetDir); + + // there's no need for a new param + // in diff. w/--warehouse-dir there will no be $tablename dir + Path output = new Path(targetDir); + runAppendTest(args, output); + } + + /** + * Query based import should also work in append mode. + * + * @throws IOException + */ + @Test + public void testAppendWithQuery() throws IOException { + ArrayList args = getOutputlessArgv(false, true, HsqldbTestServer.getFieldNames(), getConf()); + String targetDir = getWarehouseDir() + "/tempTargetDir"; + args.add("--target-dir"); + args.add(targetDir); + + Path output = new Path(targetDir); + runAppendTest(args, output); + } + + /** + * If the append source does not exist, don't crash. + */ + @Test + public void testAppendSrcDoesNotExist() throws IOException { + Configuration conf = new Configuration(); + if (!isOnPhysicalCluster()) { + conf.set(CommonArgs.FS_DEFAULT_NAME, CommonArgs.LOCAL_FS); + } + SqoopOptions options = new SqoopOptions(conf); + options.setTableName("meep"); + Path missingPath = new Path("doesNotExistForAnyReason"); + FileSystem local = FileSystem.getLocal(conf); + assertFalse(local.exists(missingPath)); + ImportJobContext importContext = new ImportJobContext("meep", null, + options, missingPath); + AppendUtils utils = new AppendUtils(importContext); + utils.append(); + } + +} + http://git-wip-us.apache.org/repos/asf/sqoop/blob/6984a36c/src/test/org/apache/sqoop/TestAutoResetMapper.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/TestAutoResetMapper.java b/src/test/org/apache/sqoop/TestAutoResetMapper.java index fd29c2d..1ad9c33 100644 --- a/src/test/org/apache/sqoop/TestAutoResetMapper.java +++ b/src/test/org/apache/sqoop/TestAutoResetMapper.java @@ -36,7 +36,7 @@ import org.apache.sqoop.tool.ImportAllTablesTool; import org.junit.After; import org.junit.Before; -import com.cloudera.sqoop.testutil.ImportJobTestCase; +import org.apache.sqoop.testutil.ImportJobTestCase; import org.junit.Test; import static org.junit.Assert.assertEquals; http://git-wip-us.apache.org/repos/asf/sqoop/blob/6984a36c/src/test/org/apache/sqoop/TestAvroExport.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/TestAvroExport.java b/src/test/org/apache/sqoop/TestAvroExport.java new file mode 100644 index 0000000..d1f1054 --- /dev/null +++ b/src/test/org/apache/sqoop/TestAvroExport.java @@ -0,0 +1,536 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +import org.apache.sqoop.testutil.BaseSqoopTestCase; +import org.apache.sqoop.testutil.CommonArgs; +import org.apache.sqoop.testutil.ExportJobTestCase; +import com.google.common.collect.Lists; + +import java.io.IOException; +import java.io.OutputStream; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; + +import org.apache.avro.Conversions; +import org.apache.avro.LogicalTypes; +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.junit.Test; +import org.junit.Rule; +import org.junit.rules.ExpectedException; + +/** + * Test that we can export Avro Data Files from HDFS into databases. + */ + +public class TestAvroExport extends ExportJobTestCase { + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + /** + * @return an argv for the CodeGenTool to use when creating tables to export. + */ + protected String [] getCodeGenArgv(String... extraArgs) { + List<String> codeGenArgv = new ArrayList<String>(); + + if (null != extraArgs) { + for (String arg : extraArgs) { + codeGenArgv.add(arg); + } + } + + codeGenArgv.add("--table"); + codeGenArgv.add(getTableName()); + codeGenArgv.add("--connect"); + codeGenArgv.add(getConnectString()); + + return codeGenArgv.toArray(new String[0]); + } + + /** When generating data for export tests, each column is generated + according to a ColumnGenerator. Methods exist for determining + what to put into Avro objects in the files to export, as well + as what the object representation of the column as returned by + the database should look like. + */ + public interface ColumnGenerator { + /** For a row with id rowNum, what should we write into that + Avro record to export? + */ + Object getExportValue(int rowNum); + + /** Return the Avro schema for the field. */ + Schema getColumnAvroSchema(); + + /** For a row with id rowNum, what should the database return + for the given column's value? + */ + Object getVerifyValue(int rowNum); + + /** Return the column type to put in the CREATE TABLE statement. */ + String getColumnType(); + } + + private ColumnGenerator colGenerator(final Object exportValue, + final Schema schema, final Object verifyValue, + final String columnType) { + return new ColumnGenerator() { + @Override + public Object getVerifyValue(int rowNum) { + return verifyValue; + } + @Override + public Object getExportValue(int rowNum) { + return exportValue; + } + @Override + public String getColumnType() { + return columnType; + } + @Override + public Schema getColumnAvroSchema() { + return schema; + } + }; + } + + /** + * Create a data file that gets exported to the db. + * @param fileNum the number of the file (for multi-file export) + * @param numRecords how many records to write to the file. + */ + protected void createAvroFile(int fileNum, int numRecords, + ColumnGenerator... extraCols) throws IOException { + + Path tablePath = getTablePath(); + Path filePath = new Path(tablePath, "part" + fileNum); + + Configuration conf = new Configuration(); + if (!BaseSqoopTestCase.isOnPhysicalCluster()) { + conf.set(CommonArgs.FS_DEFAULT_NAME, CommonArgs.LOCAL_FS); + } + FileSystem fs = FileSystem.get(conf); + fs.mkdirs(tablePath); + OutputStream os = fs.create(filePath); + + Schema schema = buildAvroSchema(extraCols); + DatumWriter<GenericRecord> datumWriter = + new GenericDatumWriter<GenericRecord>(); + DataFileWriter<GenericRecord> dataFileWriter = + new DataFileWriter<GenericRecord>(datumWriter); + dataFileWriter.create(schema, os); + + for (int i = 0; i < numRecords; i++) { + GenericRecord record = new GenericData.Record(schema); + record.put("id", i); + record.put("msg", getMsgPrefix() + i); + addExtraColumns(record, i, extraCols); + dataFileWriter.append(record); + } + + dataFileWriter.close(); + os.close(); + } + + private Schema buildAvroSchema(ColumnGenerator... extraCols) { + List<Field> fields = new ArrayList<Field>(); + fields.add(buildAvroField("id", Schema.Type.INT)); + fields.add(buildAvroField("msg", Schema.Type.STRING)); + int colNum = 0; + // Issue [SQOOP-2846] + if (null != extraCols) { + for (ColumnGenerator gen : extraCols) { + if (gen.getColumnAvroSchema() != null) { + fields.add(buildAvroField(forIdx(colNum++), gen.getColumnAvroSchema())); + } + } + } + Schema schema = Schema.createRecord("myschema", null, null, false); + schema.setFields(fields); + return schema; + } + + private void addExtraColumns(GenericRecord record, int rowNum, + ColumnGenerator[] extraCols) { + int colNum = 0; + // Issue [SQOOP-2846] + if (null != extraCols) { + for (ColumnGenerator gen : extraCols) { + if (gen.getColumnAvroSchema() != null) { + record.put(forIdx(colNum++), gen.getExportValue(rowNum)); + } + } + } + } + + private Field buildAvroField(String name, Schema.Type type) { + return new Field(name, Schema.create(type), null, null); + } + + private Field buildAvroField(String name, Schema schema) { + return new Field(name, schema, null, null); + } + + /** Return the column name for a column index. + * Each table contains two columns named 'id' and 'msg', and then an + * arbitrary number of additional columns defined by ColumnGenerators. + * These columns are referenced by idx 0, 1, 2... + * @param idx the index of the ColumnGenerator in the array passed to + * createTable(). + * @return the name of the column + */ + protected String forIdx(int idx) { + return "col" + idx; + } + + /** + * Return a SQL statement that drops a table, if it exists. + * @param tableName the table to drop. + * @return the SQL statement to drop that table. + */ + protected String getDropTableStatement(String tableName) { + return "DROP TABLE " + tableName + " IF EXISTS"; + } + + /** Create the table definition to export to, removing any prior table. + By specifying ColumnGenerator arguments, you can add extra columns + to the table of arbitrary type. + */ + private void createTable(ColumnGenerator... extraColumns) + throws SQLException { + Connection conn = getConnection(); + PreparedStatement statement = conn.prepareStatement( + getDropTableStatement(getTableName()), + ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + try { + statement.executeUpdate(); + conn.commit(); + } finally { + statement.close(); + } + + StringBuilder sb = new StringBuilder(); + sb.append("CREATE TABLE "); + sb.append(getTableName()); + sb.append(" (id INT NOT NULL PRIMARY KEY, msg VARCHAR(64)"); + int colNum = 0; + for (ColumnGenerator gen : extraColumns) { + if (gen.getColumnType() != null) { + sb.append(", " + forIdx(colNum++) + " " + gen.getColumnType()); + } + } + sb.append(")"); + + statement = conn.prepareStatement(sb.toString(), + ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + try { + statement.executeUpdate(); + conn.commit(); + } finally { + statement.close(); + } + } + + /** + * Create the table definition to export and also inserting one records for + * identifying the updates. Issue [SQOOP-2846] + */ + private void createTableWithInsert() throws SQLException { + Connection conn = getConnection(); + PreparedStatement statement = conn.prepareStatement(getDropTableStatement(getTableName()), + ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + try { + statement.executeUpdate(); + conn.commit(); + } finally { + statement.close(); + } + + StringBuilder sb = new StringBuilder(); + sb.append("CREATE TABLE "); + sb.append(getTableName()); + sb.append(" (id INT NOT NULL PRIMARY KEY, msg VARCHAR(64)"); + sb.append(")"); + statement = conn.prepareStatement(sb.toString(), ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + try { + statement.executeUpdate(); + Statement statement2 = conn.createStatement(); + String insertCmd = "INSERT INTO " + getTableName() + " (ID,MSG) VALUES(" + 0 + ",'testMsg');"; + statement2.execute(insertCmd); + conn.commit(); + } finally { + statement.close(); + } + } + + + /** Verify that on a given row, a column has a given value. + * @param id the id column specifying the row to test. + */ + private void assertColValForRowId(int id, String colName, Object expectedVal) + throws SQLException { + Connection conn = getConnection(); + LOG.info("Verifying column " + colName + " has value " + expectedVal); + + PreparedStatement statement = conn.prepareStatement( + "SELECT " + colName + " FROM " + getTableName() + " WHERE ID = " + id, + ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + Object actualVal = null; + try { + ResultSet rs = statement.executeQuery(); + try { + rs.next(); + actualVal = rs.getObject(1); + } finally { + rs.close(); + } + } finally { + statement.close(); + } + + if (expectedVal != null && expectedVal instanceof byte[]) { + assertArrayEquals((byte[]) expectedVal, (byte[]) actualVal); + } else { + assertEquals("Got unexpected column value", expectedVal, actualVal); + } + } + + /** Verify that for the max and min values of the 'id' column, the values + for a given column meet the expected values. + */ + protected void assertColMinAndMax(String colName, ColumnGenerator generator) + throws SQLException { + Connection conn = getConnection(); + int minId = getMinRowId(conn); + int maxId = getMaxRowId(conn); + + LOG.info("Checking min/max for column " + colName + " with type " + + generator.getColumnType()); + + Object expectedMin = generator.getVerifyValue(minId); + Object expectedMax = generator.getVerifyValue(maxId); + + assertColValForRowId(minId, colName, expectedMin); + assertColValForRowId(maxId, colName, expectedMax); + } + + @Test + public void testSupportedAvroTypes() throws IOException, SQLException { + GenericData.get().addLogicalTypeConversion(new Conversions.DecimalConversion()); + + String[] argv = {}; + final int TOTAL_RECORDS = 1 * 10; + + byte[] b = new byte[] { (byte) 1, (byte) 2 }; + Schema fixed = Schema.createFixed("myfixed", null, null, 2); + Schema enumeration = Schema.createEnum("myenum", null, null, + Lists.newArrayList("a", "b")); + Schema decimalSchema = LogicalTypes.decimal(3,2) + .addToSchema(Schema.createFixed("dec1", null, null, 2)); + + ColumnGenerator[] gens = new ColumnGenerator[] { + colGenerator(true, Schema.create(Schema.Type.BOOLEAN), true, "BIT"), + colGenerator(100, Schema.create(Schema.Type.INT), 100, "INTEGER"), + colGenerator(200L, Schema.create(Schema.Type.LONG), 200L, "BIGINT"), + // HSQLDB maps REAL to double, not float: + colGenerator(1.0f, Schema.create(Schema.Type.FLOAT), 1.0d, "REAL"), + colGenerator(2.0d, Schema.create(Schema.Type.DOUBLE), 2.0d, "DOUBLE"), + colGenerator("s", Schema.create(Schema.Type.STRING), "s", "VARCHAR(8)"), + colGenerator(ByteBuffer.wrap(b), Schema.create(Schema.Type.BYTES), + b, "VARBINARY(8)"), + colGenerator(new GenericData.Fixed(fixed, b), fixed, + b, "BINARY(2)"), + colGenerator(new GenericData.EnumSymbol(enumeration, "a"), enumeration, + "a", "VARCHAR(8)"), + colGenerator(new BigDecimal("2.00"), decimalSchema, + new BigDecimal("2.00"), "DECIMAL(3,2)"), + colGenerator("22.00", Schema.create(Schema.Type.STRING), + new BigDecimal("22.00"), "DECIMAL(4,2)"), + }; + createAvroFile(0, TOTAL_RECORDS, gens); + createTable(gens); + runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1))); + verifyExport(TOTAL_RECORDS); + for (int i = 0; i < gens.length; i++) { + assertColMinAndMax(forIdx(i), gens[i]); + } + } + + @Test + public void testPathPatternInExportDir() throws IOException, SQLException { + final int TOTAL_RECORDS = 10; + + ColumnGenerator[] gens = new ColumnGenerator[] { + colGenerator(true, Schema.create(Schema.Type.BOOLEAN), true, "BIT"), + }; + + createAvroFile(0, TOTAL_RECORDS, gens); + createTable(gens); + + // Converts path to an unary set while preserving the leading '/' + String pathPattern = new StringBuilder(getTablePath().toString()) + .insert(1, "{") + .append("}") + .toString(); + + runExport(getArgv(true, 10, 10, "--export-dir", pathPattern)); + verifyExport(TOTAL_RECORDS); + } + + @Test + public void testNullableField() throws IOException, SQLException { + String[] argv = {}; + final int TOTAL_RECORDS = 1 * 10; + + List<Schema> childSchemas = new ArrayList<Schema>(); + childSchemas.add(Schema.create(Schema.Type.NULL)); + childSchemas.add(Schema.create(Schema.Type.STRING)); + Schema schema = Schema.createUnion(childSchemas); + ColumnGenerator gen0 = colGenerator(null, schema, null, "VARCHAR(64)"); + ColumnGenerator gen1 = colGenerator("s", schema, "s", "VARCHAR(64)"); + createAvroFile(0, TOTAL_RECORDS, gen0, gen1); + createTable(gen0, gen1); + runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1))); + verifyExport(TOTAL_RECORDS); + assertColMinAndMax(forIdx(0), gen0); + assertColMinAndMax(forIdx(1), gen1); + } + + @Test + public void testAvroRecordsNotSupported() throws IOException, SQLException { + String[] argv = {}; + final int TOTAL_RECORDS = 1; + + Schema schema = Schema.createRecord("nestedrecord", null, null, false); + schema.setFields(Lists.newArrayList(buildAvroField("myint", + Schema.Type.INT))); + GenericRecord record = new GenericData.Record(schema); + record.put("myint", 100); + // DB type is not used so can be anything: + ColumnGenerator gen = colGenerator(record, schema, null, "VARCHAR(64)"); + createAvroFile(0, TOTAL_RECORDS, gen); + createTable(gen); + + thrown.expect(Exception.class); + thrown.reportMissingExceptionWithMessage("Expected Exception as Avro records are not supported"); + runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1))); + } + + @Test + public void testMissingDatabaseFields() throws IOException, SQLException { + String[] argv = {}; + final int TOTAL_RECORDS = 1; + + // null column type means don't create a database column + // the Avro value will not be exported + ColumnGenerator gen = colGenerator(100, Schema.create(Schema.Type.INT), + null, null); + createAvroFile(0, TOTAL_RECORDS, gen); + createTable(gen); + runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1))); + verifyExport(TOTAL_RECORDS); + } + + // Test Case for Issue [SQOOP-2846] + @Test + public void testAvroWithUpsert() throws IOException, SQLException { + String[] argv = { "--update-key", "ID", "--update-mode", "allowinsert" }; + final int TOTAL_RECORDS = 2; + // ColumnGenerator gen = colGenerator("100", + // Schema.create(Schema.Type.STRING), null, "VARCHAR(64)"); + createAvroFile(0, TOTAL_RECORDS, null); + createTableWithInsert(); + + thrown.expect(Exception.class); + thrown.reportMissingExceptionWithMessage("Expected Exception during Avro export with --update-mode"); + runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1))); + } + + // Test Case for Issue [SQOOP-2846] + @Test + public void testAvroWithUpdateKey() throws IOException, SQLException { + String[] argv = { "--update-key", "ID" }; + final int TOTAL_RECORDS = 1; + // ColumnGenerator gen = colGenerator("100", + // Schema.create(Schema.Type.STRING), null, "VARCHAR(64)"); + createAvroFile(0, TOTAL_RECORDS, null); + createTableWithInsert(); + runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1))); + verifyExport(getMsgPrefix() + "0"); + } + + @Test + public void testMissingAvroFields() throws IOException, SQLException { + String[] argv = {}; + final int TOTAL_RECORDS = 1; + + // null Avro schema means don't create an Avro field + ColumnGenerator gen = colGenerator(null, null, null, "VARCHAR(64)"); + createAvroFile(0, TOTAL_RECORDS, gen); + createTable(gen); + + thrown.expect(Exception.class); + thrown.reportMissingExceptionWithMessage("Expected Exception on missing Avro fields"); + runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1))); + } + + @Test + public void testSpecifiedColumnsAsAvroFields() throws IOException, SQLException { + final int TOTAL_RECORDS = 10; + ColumnGenerator[] gens = new ColumnGenerator[] { + colGenerator(000, Schema.create(Schema.Type.INT), 100, "INTEGER"), //col0 + colGenerator(111, Schema.create(Schema.Type.INT), 100, "INTEGER"), //col1 + colGenerator(222, Schema.create(Schema.Type.INT), 100, "INTEGER"), //col2 + colGenerator(333, Schema.create(Schema.Type.INT), 100, "INTEGER") //col3 + }; + createAvroFile(0, TOTAL_RECORDS, gens); + createTable(gens); + runExport(getArgv(true, 10, 10, newStrArray(null, "-m", "" + 1, "--columns", "ID,MSG,COL1,COL2"))); + verifyExport(TOTAL_RECORDS); + assertColValForRowId(0, "col0", null); + assertColValForRowId(0, "col1", 111); + assertColValForRowId(0, "col2", 222); + assertColValForRowId(0, "col3", null); + assertColValForRowId(9, "col0", null); + assertColValForRowId(9, "col1", 111); + assertColValForRowId(9, "col2", 222); + assertColValForRowId(9, "col3", null); + } + +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/6984a36c/src/test/org/apache/sqoop/TestAvroImport.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/TestAvroImport.java b/src/test/org/apache/sqoop/TestAvroImport.java new file mode 100644 index 0000000..1172fc5 --- /dev/null +++ b/src/test/org/apache/sqoop/TestAvroImport.java @@ -0,0 +1,382 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; +import org.apache.avro.Schema.Type; +import org.apache.avro.file.DataFileConstants; +import org.apache.avro.file.DataFileReader; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.mapred.FsInput; +import org.apache.avro.util.Utf8; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; + +import org.apache.sqoop.testutil.BaseSqoopTestCase; +import org.apache.sqoop.testutil.CommonArgs; +import org.apache.sqoop.testutil.HsqldbTestServer; +import org.apache.sqoop.testutil.ImportJobTestCase; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests --as-avrodatafile. + */ +public class TestAvroImport extends ImportJobTestCase { + + public static final Log LOG = LogFactory + .getLog(TestAvroImport.class.getName()); + + /** + * Create the argv to pass to Sqoop. + * + * @return the argv as an array of strings. + */ + protected String[] getOutputArgv(boolean includeHadoopFlags, + String[] extraArgs) { + ArrayList<String> args = new ArrayList<String>(); + + if (includeHadoopFlags) { + CommonArgs.addHadoopFlags(args); + } + args.add("-m"); + args.add("1"); + args.add("--table"); + args.add(getTableName()); + args.add("--connect"); + args.add(HsqldbTestServer.getUrl()); + args.add("--warehouse-dir"); + args.add(getWarehouseDir()); + args.add("--split-by"); + args.add("INTFIELD1"); + args.add("--as-avrodatafile"); + if (extraArgs != null) { + args.addAll(Arrays.asList(extraArgs)); + } + + return args.toArray(new String[0]); + } + + @Test + public void testAvroImport() throws IOException { + this.setCurTableName("Avro_Import_Test"); + avroImportTestHelper(null, null); + } + + @Test + public void testDeflateCompressedAvroImport() throws IOException { + this.setCurTableName("Deflate_Compressed_Avro_Import_Test_1"); + avroImportTestHelper(new String[] {"--compression-codec", + "org.apache.hadoop.io.compress.DefaultCodec", }, "deflate"); + } + + @Test + public void testDefaultCompressedAvroImport() throws IOException { + this.setCurTableName("Deflate_Compressed_Avro_Import_Test_2"); + avroImportTestHelper(new String[] {"--compress", }, "deflate"); + } + + @Test + public void testUnsupportedCodec() throws IOException { + try { + this.setCurTableName("Deflate_Compressed_Avro_Import_Test_3"); + avroImportTestHelper(new String[] {"--compression-codec", "foobar", }, + null); + fail("Expected IOException"); + } catch (IOException e) { + // Exception is expected + } + } + + /** + * Helper method that runs an import using Avro with optional command line + * arguments and checks that the created file matches the expectations. + * <p/> + * This can be used to test various extra options that are implemented for + * the Avro input. + * + * @param extraArgs extra command line arguments to pass to Sqoop in addition + * to those that {@link #getOutputArgv(boolean, String[])} + * returns + */ + protected void avroImportTestHelper(String[] extraArgs, String codec) + throws IOException { + String[] types = + {"BIT", "INTEGER", "BIGINT", "REAL", "DOUBLE", "VARCHAR(6)", + "VARBINARY(2)", "DECIMAL(3,2)"}; + String[] vals = {"true", "100", "200", "1.0", "2.0", "'s'", "'0102'", "'1.00'"}; + createTableWithColTypes(types, vals); + + runImport(getOutputArgv(true, extraArgs)); + + Path outputFile = new Path(getTablePath(), "part-m-00000.avro"); + DataFileReader<GenericRecord> reader = read(outputFile); + Schema schema = reader.getSchema(); + assertEquals(Schema.Type.RECORD, schema.getType()); + List<Field> fields = schema.getFields(); + assertEquals(types.length, fields.size()); + + checkField(fields.get(0), "DATA_COL0", Schema.Type.BOOLEAN); + checkField(fields.get(1), "DATA_COL1", Schema.Type.INT); + checkField(fields.get(2), "DATA_COL2", Schema.Type.LONG); + checkField(fields.get(3), "DATA_COL3", Schema.Type.FLOAT); + checkField(fields.get(4), "DATA_COL4", Schema.Type.DOUBLE); + checkField(fields.get(5), "DATA_COL5", Schema.Type.STRING); + checkField(fields.get(6), "DATA_COL6", Schema.Type.BYTES); + checkField(fields.get(7), "DATA_COL7", Schema.Type.STRING); + + GenericRecord record1 = reader.next(); + assertEquals("DATA_COL0", true, record1.get("DATA_COL0")); + assertEquals("DATA_COL1", 100, record1.get("DATA_COL1")); + assertEquals("DATA_COL2", 200L, record1.get("DATA_COL2")); + assertEquals("DATA_COL3", 1.0f, record1.get("DATA_COL3")); + assertEquals("DATA_COL4", 2.0, record1.get("DATA_COL4")); + assertEquals("DATA_COL5", new Utf8("s"), record1.get("DATA_COL5")); + Object object = record1.get("DATA_COL6"); + assertTrue(object instanceof ByteBuffer); + ByteBuffer b = ((ByteBuffer) object); + assertEquals((byte) 1, b.get(0)); + assertEquals((byte) 2, b.get(1)); + assertEquals("DATA_COL7", "1.00", record1.get("DATA_COL7").toString()); + + if (codec != null) { + assertEquals(codec, reader.getMetaString(DataFileConstants.CODEC)); + } + + checkSchemaFile(schema); + } + + @Test + public void testOverrideTypeMapping() throws IOException { + String [] types = { "INT" }; + String [] vals = { "10" }; + createTableWithColTypes(types, vals); + + String [] extraArgs = { "--map-column-java", "DATA_COL0=String"}; + + runImport(getOutputArgv(true, extraArgs)); + + Path outputFile = new Path(getTablePath(), "part-m-00000.avro"); + DataFileReader<GenericRecord> reader = read(outputFile); + Schema schema = reader.getSchema(); + assertEquals(Schema.Type.RECORD, schema.getType()); + List<Field> fields = schema.getFields(); + assertEquals(types.length, fields.size()); + + checkField(fields.get(0), "DATA_COL0", Schema.Type.STRING); + + GenericRecord record1 = reader.next(); + assertEquals("DATA_COL0", new Utf8("10"), record1.get("DATA_COL0")); + } + + @Test + public void testFirstUnderscoreInColumnName() throws IOException { + String [] names = { "_NAME" }; + String [] types = { "INT" }; + String [] vals = { "1987" }; + createTableWithColTypesAndNames(names, types, vals); + + runImport(getOutputArgv(true, null)); + + Path outputFile = new Path(getTablePath(), "part-m-00000.avro"); + DataFileReader<GenericRecord> reader = read(outputFile); + Schema schema = reader.getSchema(); + assertEquals(Schema.Type.RECORD, schema.getType()); + List<Field> fields = schema.getFields(); + assertEquals(types.length, fields.size()); + + checkField(fields.get(0), "__NAME", Type.INT); + + GenericRecord record1 = reader.next(); + assertEquals("__NAME", 1987, record1.get("__NAME")); + } + + @Test + public void testNonstandardCharactersInColumnName() throws IOException { + String [] names = { "avro\uC3A11" }; + String [] types = { "INT" }; + String [] vals = { "1987" }; + this.setCurTableName("Non_Std_Character_Test"); + createTableWithColTypesAndNames(names, types, vals); + + runImport(getOutputArgv(true, null)); + + Path outputFile = new Path(getTablePath(), "part-m-00000.avro"); + DataFileReader<GenericRecord> reader = read(outputFile); + Schema schema = reader.getSchema(); + assertEquals(Schema.Type.RECORD, schema.getType()); + List<Field> fields = schema.getFields(); + assertEquals(types.length, fields.size()); + + checkField(fields.get(0), "AVRO\uC3A11", Type.INT); + + GenericRecord record1 = reader.next(); + assertEquals("AVRO\uC3A11", 1987, record1.get("AVRO\uC3A11")); + } + + @Test + public void testNonIdentCharactersInColumnName() throws IOException { + String [] names = { "test_a-v+r/o" }; + String [] types = { "INT" }; + String [] vals = { "2015" }; + createTableWithColTypesAndNames(names, types, vals); + + runImport(getOutputArgv(true, null)); + + Path outputFile = new Path(getTablePath(), "part-m-00000.avro"); + DataFileReader<GenericRecord> reader = read(outputFile); + Schema schema = reader.getSchema(); + assertEquals(Schema.Type.RECORD, schema.getType()); + List<Field> fields = schema.getFields(); + assertEquals(types.length, fields.size()); + + checkField(fields.get(0), "TEST_A_V_R_O", Type.INT); + + GenericRecord record1 = reader.next(); + assertEquals("TEST_A_V_R_O", 2015, record1.get("TEST_A_V_R_O")); + } + + /* + * Test Case For checking multiple columns having non standard characters in multiple columns + */ + @Test + public void testNonstandardCharactersInMultipleColumns() throws IOException { + String[] names = { "id$1", "id1$" }; + String[] types = { "INT", "INT" }; + String[] vals = { "1987", "1988" }; + this.setCurTableName("Non_Std_Character_Test_For_Multiple_Columns"); + createTableWithColTypesAndNames(names, types, vals); + + runImport(getOutputArgv(true, null)); + + Path outputFile = new Path(getTablePath(), "part-m-00000.avro"); + DataFileReader<GenericRecord> reader = read(outputFile); + Schema schema = reader.getSchema(); + assertEquals(Schema.Type.RECORD, schema.getType()); + List<Field> fields = schema.getFields(); + assertEquals(types.length, fields.size()); + + checkField(fields.get(0), "ID_1", Type.INT); + + GenericRecord record1 = reader.next(); + assertEquals("ID_1", 1987, record1.get("ID_1")); + checkField(fields.get(1), "ID1_", Type.INT); + assertEquals("ID1_", 1988, record1.get("ID1_")); + } + + protected void checkField(Field field, String name, Type type) { + assertEquals(name, field.name()); + assertEquals(Schema.Type.UNION, field.schema().getType()); + assertEquals(Schema.Type.NULL, field.schema().getTypes().get(0).getType()); + assertEquals(type, field.schema().getTypes().get(1).getType()); + } + + @Test + public void testNullableAvroImport() throws IOException, SQLException { + String [] types = { "INT" }; + String [] vals = { null }; + createTableWithColTypes(types, vals); + + runImport(getOutputArgv(true, null)); + + Path outputFile = new Path(getTablePath(), "part-m-00000.avro"); + DataFileReader<GenericRecord> reader = read(outputFile); + + GenericRecord record1 = reader.next(); + assertNull(record1.get("DATA_COL0")); + } + + @Test + public void testSpecialCharactersInColumnMappingWithConvertion() throws IOException, SQLException { + // escaping enabled by default + String [] extraArgsEscapeColNamesWithMapping = { "--map-column-java", + "INTFIELD1=String,DATA_#_COL0=String,DATA#COL1=String,DATA___COL2=String"}; + + // disable escaping + String [] extraArgsEscapingDisables = {"--escape-mapping-column-names", "false"}; + + // escaping enabled but mapping not provided + String [] extraArgsEscapingWithoutMapping = {}; + + checkRecordWithExtraArgs(extraArgsEscapeColNamesWithMapping, "TABLE1"); + checkRecordWithExtraArgs(extraArgsEscapingDisables, "TABLE2"); + checkRecordWithExtraArgs(extraArgsEscapingWithoutMapping, "TABLE3"); + } + + private void checkRecordWithExtraArgs(String[] extraArgs, String tableName) throws IOException { + String date = "2017-01-19"; + String timeStamp = "2017-01-19 14:47:57.112000"; + + String [] names = {"INTFIELD1", "DATA_#_COL0", "DATA#COL1", "DATA___COL2"}; + String [] types = { "INT", "DATE", "TIMESTAMP", "DECIMAL(2,20)" }; + String [] vals = {"1", "{ts \'" + date + "\'}", "{ts \'" + timeStamp + "\'}", "2e20"}; + + String [] checkNames = {"INTFIELD1", "DATA___COL0", "DATA_COL1", "DATA___COL2"}; + + setCurTableName(tableName); + + createTableWithColTypesAndNames(names, types, vals); + runImport(getOutputArgv(true, extraArgs)); + + Path outputFile = new Path(getTablePath(), "part-m-00000.avro"); + DataFileReader<GenericRecord> reader = read(outputFile); + GenericRecord record = reader.next(); + + for (String columnName : checkNames) { + assertNotNull(record.get(columnName)); + } + + removeTableDir(); + } + + protected DataFileReader<GenericRecord> read(Path filename) throws IOException { + Configuration conf = new Configuration(); + if (!BaseSqoopTestCase.isOnPhysicalCluster()) { + conf.set(CommonArgs.FS_DEFAULT_NAME, CommonArgs.LOCAL_FS); + } + FsInput fsInput = new FsInput(filename, conf); + DatumReader<GenericRecord> datumReader = + new GenericDatumReader<GenericRecord>(); + return new DataFileReader<GenericRecord>(fsInput, datumReader); + } + + protected void checkSchemaFile(final Schema schema) throws IOException { + final File schemaFile = new File(schema.getName() + ".avsc"); + assertTrue(schemaFile.exists()); + assertEquals(schema, new Schema.Parser().parse(schemaFile)); + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/6984a36c/src/test/org/apache/sqoop/TestAvroImportExportRoundtrip.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/TestAvroImportExportRoundtrip.java b/src/test/org/apache/sqoop/TestAvroImportExportRoundtrip.java new file mode 100644 index 0000000..6de09f3 --- /dev/null +++ b/src/test/org/apache/sqoop/TestAvroImportExportRoundtrip.java @@ -0,0 +1,270 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop; + +import org.apache.sqoop.testutil.CommonArgs; +import org.apache.sqoop.testutil.HsqldbTestServer; +import org.apache.sqoop.testutil.ImportJobTestCase; +import org.apache.sqoop.tool.ExportTool; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.util.StringUtils; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** + * Tests importing a database table as an Avro Data File then back to the + * database. + */ +public class TestAvroImportExportRoundtrip extends ImportJobTestCase { + + public static final Log LOG = LogFactory + .getLog(TestAvroImportExportRoundtrip.class.getName()); + + @Test + public void testRoundtripQuery() throws IOException, SQLException { + String[] argv = {}; + + runImport(getOutputArgvForQuery(true)); + deleteTableData(); + runExport(getExportArgvForQuery(true, 10, 10, newStrArray(argv, "-m", + "" + 1))); + + checkFirstColumnSum(); + } + + @Test + public void testRoundtrip() throws IOException, SQLException { + String[] argv = {}; + + runImport(getOutputArgv(true)); + deleteTableData(); + runExport(getExportArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1))); + + checkFirstColumnSum(); + } + + /** + * Create the argv to pass to Sqoop. + * + * @return the argv as an array of strings. + */ + protected String[] getOutputArgv(boolean includeHadoopFlags) { + ArrayList<String> args = new ArrayList<String>(); + + if (includeHadoopFlags) { + CommonArgs.addHadoopFlags(args); + } + + args.add("--table"); + args.add(HsqldbTestServer.getTableName()); + args.add("--connect"); + args.add(HsqldbTestServer.getUrl()); + args.add("--warehouse-dir"); + args.add(getWarehouseDir()); + args.add("--split-by"); + args.add("INTFIELD1"); + args.add("--as-avrodatafile"); + + return args.toArray(new String[0]); + } + + /** + * Create the argv to pass to Sqoop. + * + * @return the argv as an array of strings. + */ + protected String[] getOutputArgvForQuery(boolean includeHadoopFlags) { + ArrayList<String> args = new ArrayList<String>(); + + if (includeHadoopFlags) { + CommonArgs.addHadoopFlags(args); + } + + args.add("--query"); + args.add("select * from " + HsqldbTestServer.getTableName() + + " where $CONDITIONS"); + args.add("--connect"); + args.add(HsqldbTestServer.getUrl()); + args.add("--target-dir"); + args.add(getWarehouseDir() + "/query_result"); + args.add("--split-by"); + args.add("INTFIELD1"); + args.add("--as-avrodatafile"); + + return args.toArray(new String[0]); + } + + protected String [] getExportArgv(boolean includeHadoopFlags, + int rowsPerStmt, int statementsPerTx, String... additionalArgv) { + ArrayList<String> args = formatAdditionalArgs(additionalArgv); + + args.add("--table"); + args.add(getTableName()); + args.add("--export-dir"); + args.add(getTablePath().toString()); + args.add("--connect"); + args.add(getConnectString()); + args.add("-m"); + args.add("1"); + + LOG.debug("args:"); + for (String a : args) { + LOG.debug(" " + a); + } + + return args.toArray(new String[0]); + } + + protected String [] getExportArgvForQuery(boolean includeHadoopFlags, + int rowsPerStmt, int statementsPerTx, String... additionalArgv) { + ArrayList<String> args = formatAdditionalArgs(additionalArgv); + + args.add("--table"); + args.add(getTableName()); + args.add("--export-dir"); + args.add(getWarehouseDir() + "/query_result"); + args.add("--connect"); + args.add(getConnectString()); + args.add("-m"); + args.add("1"); + + LOG.debug("args:"); + for (String a : args) { + LOG.debug(" " + a); + } + + return args.toArray(new String[0]); + } + + protected ArrayList<String> formatAdditionalArgs(String... additionalArgv) { + ArrayList<String> args = new ArrayList<String>(); + + // Any additional Hadoop flags (-D foo=bar) are prepended. + if (null != additionalArgv) { + boolean prevIsFlag = false; + for (String arg : additionalArgv) { + if (arg.equals("-D")) { + args.add(arg); + prevIsFlag = true; + } else if (prevIsFlag) { + args.add(arg); + prevIsFlag = false; + } + } + } + + // The sqoop-specific additional args are then added. + if (null != additionalArgv) { + boolean prevIsFlag = false; + for (String arg : additionalArgv) { + if (arg.equals("-D")) { + prevIsFlag = true; + continue; + } else if (prevIsFlag) { + prevIsFlag = false; + continue; + } else { + // normal argument. + args.add(arg); + } + } + } + return args; + } + + // this test just uses the two int table. + protected String getTableName() { + return HsqldbTestServer.getTableName(); + } + + private void deleteTableData() throws SQLException { + Connection conn = getConnection(); + PreparedStatement statement = conn.prepareStatement( + "DELETE FROM " + getTableName(), + ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + try { + statement.executeUpdate(); + conn.commit(); + } finally { + statement.close(); + } + } + + /** + * Run a MapReduce-based export (using the argv provided to control + * execution). + * @return the generated jar filename + */ + protected List<String> runExport(String [] argv) throws IOException { + // run the tool through the normal entry-point. + int ret; + List<String> generatedJars = null; + try { + ExportTool exporter = new ExportTool(); + Sqoop sqoop = new Sqoop(exporter); + ret = Sqoop.runSqoop(sqoop, argv); + generatedJars = exporter.getGeneratedJarFiles(); + } catch (Exception e) { + LOG.error("Got exception running Sqoop: " + + StringUtils.stringifyException(e)); + ret = 1; + } + + // expect a successful return. + if (0 != ret) { + throw new IOException("Failure during job; return status " + ret); + } + + return generatedJars; + } + + private void checkFirstColumnSum() throws SQLException { + Connection conn = getConnection(); + + PreparedStatement statement = conn.prepareStatement( + "SELECT SUM(INTFIELD1) FROM " + getTableName(), + ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + int actualVal = 0; + try { + ResultSet rs = statement.executeQuery(); + try { + rs.next(); + actualVal = rs.getInt(1); + } finally { + rs.close(); + } + } finally { + statement.close(); + } + + assertEquals("First column column sum", HsqldbTestServer.getFirstColSum(), + actualVal); + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/6984a36c/src/test/org/apache/sqoop/TestBigDecimalExport.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/TestBigDecimalExport.java b/src/test/org/apache/sqoop/TestBigDecimalExport.java index 414e3d9..ccea173 100644 --- a/src/test/org/apache/sqoop/TestBigDecimalExport.java +++ b/src/test/org/apache/sqoop/TestBigDecimalExport.java @@ -33,8 +33,8 @@ import java.util.List; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import com.cloudera.sqoop.testutil.CommonArgs; -import com.cloudera.sqoop.testutil.ExportJobTestCase; +import org.apache.sqoop.testutil.CommonArgs; +import org.apache.sqoop.testutil.ExportJobTestCase; import org.junit.Test; import static org.junit.Assert.assertEquals; http://git-wip-us.apache.org/repos/asf/sqoop/blob/6984a36c/src/test/org/apache/sqoop/TestBigDecimalImport.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/TestBigDecimalImport.java b/src/test/org/apache/sqoop/TestBigDecimalImport.java index d265d17..286f54e 100644 --- a/src/test/org/apache/sqoop/TestBigDecimalImport.java +++ b/src/test/org/apache/sqoop/TestBigDecimalImport.java @@ -28,8 +28,8 @@ import java.util.List; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import com.cloudera.sqoop.testutil.CommonArgs; -import com.cloudera.sqoop.testutil.ImportJobTestCase; +import org.apache.sqoop.testutil.CommonArgs; +import org.apache.sqoop.testutil.ImportJobTestCase; import org.junit.Test; import static org.junit.Assert.assertEquals;
