Author: jarcec
Date: Wed Apr 4 06:58:52 2012
New Revision: 1309268
URL: http://svn.apache.org/viewvc?rev=1309268&view=rev
Log:
SQOOP-468. Oracle free form queries fail.
(Cheolsoo Park via Jarek Jarcec Cecho)
Added:
sqoop/trunk/src/test/com/cloudera/sqoop/TestFreeFormQueryImport.java
sqoop/trunk/src/test/com/cloudera/sqoop/manager/MySQLFreeFormQueryTest.java
sqoop/trunk/src/test/com/cloudera/sqoop/manager/OracleFreeFormQueryTest.java
Modified:
sqoop/trunk/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
Modified:
sqoop/trunk/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
URL:
http://svn.apache.org/viewvc/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java?rev=1309268&r1=1309267&r2=1309268&view=diff
==============================================================================
--- sqoop/trunk/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
(original)
+++ sqoop/trunk/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
Wed Apr 4 06:58:52 2012
@@ -111,6 +111,36 @@ public class DataDrivenImportJob extends
return null;
}
+ /**
+ * Build the boundary query for the column of the result set created by
+ * the given query.
+ * @param col column name whose boundaries we're interested in.
+ * @param query sub-query used to create the result set.
+ * @return input boundary query as a string
+ */
+ private String buildBoundaryQuery(String col, String query) {
+ if (col == null) {
+ return "";
+ }
+
+ // Replace table name with alias 't1' if column name is a fully
+ // qualified name. This is needed because "tableName"."columnName"
+ // in the input boundary query causes a SQL syntax error in most dbs
+ // including Oracle and MySQL.
+ String alias = "t1";
+ int dot = col.lastIndexOf('.');
+ String qualifiedName = (dot == -1) ? col : alias + col.substring(dot);
+
+ ConnManager mgr = getContext().getConnManager();
+ String ret = mgr.getInputBoundsQuery(qualifiedName, query);
+ if (ret != null) {
+ return ret;
+ }
+
+ return "SELECT MIN(" + qualifiedName + "), MAX(" + qualifiedName + ") "
+ + "FROM (" + query + ") AS " + alias;
+ }
+
@Override
protected void configureInputFormat(Job job, String tableName,
String tableClassName, String splitByCol) throws IOException {
@@ -165,18 +195,8 @@ public class DataDrivenImportJob extends
DataDrivenDBInputFormat.SUBSTITUTE_TOKEN, " (1 = 1) ");
String inputBoundingQuery = options.getBoundaryQuery();
-
if (inputBoundingQuery == null) {
- inputBoundingQuery =
- mgr.getInputBoundsQuery(splitByCol, sanitizedQuery);
- if (inputBoundingQuery == null) {
- if (splitByCol != null) {
- inputBoundingQuery = "SELECT MIN(" + splitByCol + "), MAX("
- + splitByCol + ") FROM (" + sanitizedQuery + ") AS t1";
- } else {
- inputBoundingQuery = "";
- }
- }
+ inputBoundingQuery = buildBoundaryQuery(splitByCol, sanitizedQuery);
}
DataDrivenDBInputFormat.setInput(job, DBWritable.class,
inputQuery, inputBoundingQuery);
Added: sqoop/trunk/src/test/com/cloudera/sqoop/TestFreeFormQueryImport.java
URL:
http://svn.apache.org/viewvc/sqoop/trunk/src/test/com/cloudera/sqoop/TestFreeFormQueryImport.java?rev=1309268&view=auto
==============================================================================
--- sqoop/trunk/src/test/com/cloudera/sqoop/TestFreeFormQueryImport.java (added)
+++ sqoop/trunk/src/test/com/cloudera/sqoop/TestFreeFormQueryImport.java Wed
Apr 4 06:58:52 2012
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+
+import com.cloudera.sqoop.testutil.CommonArgs;
+import com.cloudera.sqoop.testutil.ImportJobTestCase;
+
+/**
+ * Test free form query import.
+ */
+public class TestFreeFormQueryImport extends ImportJobTestCase {
+
+ private Log log;
+
+ public TestFreeFormQueryImport() {
+ this.log = LogFactory.getLog(TestFreeFormQueryImport.class.getName());
+ }
+
+ /**
+ * @return the Log object to use for reporting during this test
+ */
+ protected Log getLogger() {
+ return log;
+ }
+
+ /** the names of the tables we're creating. */
+ private List<String> tableNames;
+
+ @Override
+ public void tearDown() {
+ // Clean up the database on our way out.
+ for (String tableName : tableNames) {
+ try {
+ dropTableIfExists(tableName);
+ } catch (SQLException e) {
+ log.warn("Error trying to drop table '" + tableName
+ + "' on tearDown: " + e);
+ }
+ }
+ super.tearDown();
+ }
+
+ /**
+ * Create the argv to pass to Sqoop.
+ * @param splitByCol column of the table used to split work.
+ * @param query free form query to be used.
+ * @return the argv as an array of strings.
+ */
+ protected String [] getArgv(String splitByCol, String query) {
+ ArrayList<String> args = new ArrayList<String>();
+
+ CommonArgs.addHadoopFlags(args);
+
+ args.add("--connect");
+ args.add(getConnectString());
+ args.add("--target-dir");
+ args.add(getWarehouseDir());
+ args.add("--split-by");
+ args.add(splitByCol);
+ args.add("--num-mappers");
+ args.add("2");
+ args.add("--query");
+ args.add(query);
+
+ return args.toArray(new String[0]);
+ }
+
+ /**
+ * Create two tables that share the common id column. Run free-form query
+ * import on the result table that is created by joining the two tables on
+ * the id column.
+ */
+ public void testSimpleJoin() throws IOException {
+ tableNames = new ArrayList<String>();
+
+ String [] types1 = { "SMALLINT", };
+ String [] vals1 = { "1", };
+ String tableName1 = getTableName();
+ createTableWithColTypes(types1, vals1);
+ tableNames.add(tableName1);
+
+ incrementTableNum();
+
+ String [] types2 = { "SMALLINT", "VARCHAR(32)", };
+ String [] vals2 = { "1", "'foo'", };
+ String tableName2 = getTableName();
+ createTableWithColTypes(types2, vals2);
+ tableNames.add(tableName2);
+
+ String query = "SELECT "
+ + tableName1 + "." + getColName(0) + ", "
+ + tableName2 + "." + getColName(1) + " "
+ + "FROM " + tableName1 + " JOIN " + tableName2 + " ON ("
+ + tableName1 + "." + getColName(0) + " = "
+ + tableName2 + "." + getColName(0) + ") WHERE "
+ + tableName1 + "." + getColName(0) + " < 3 AND $CONDITIONS";
+
+ runImport(getArgv(tableName1 + "." + getColName(0), query));
+
+ Path warehousePath = new Path(this.getWarehouseDir());
+ Path filePath = new Path(warehousePath, "part-m-00000");
+ String expectedVal = "1,foo";
+
+ BufferedReader reader = null;
+ if (!isOnPhysicalCluster()) {
+ reader = new BufferedReader(
+ new InputStreamReader(new FileInputStream(
+ new File(filePath.toString()))));
+ } else {
+ FileSystem dfs = FileSystem.get(getConf());
+ FSDataInputStream dis = dfs.open(filePath);
+ reader = new BufferedReader(new InputStreamReader(dis));
+ }
+ try {
+ String line = reader.readLine();
+ assertEquals("QueryResult expected a different string",
+ expectedVal, line);
+ } finally {
+ IOUtils.closeStream(reader);
+ }
+ }
+}
Added:
sqoop/trunk/src/test/com/cloudera/sqoop/manager/MySQLFreeFormQueryTest.java
URL:
http://svn.apache.org/viewvc/sqoop/trunk/src/test/com/cloudera/sqoop/manager/MySQLFreeFormQueryTest.java?rev=1309268&view=auto
==============================================================================
--- sqoop/trunk/src/test/com/cloudera/sqoop/manager/MySQLFreeFormQueryTest.java
(added)
+++ sqoop/trunk/src/test/com/cloudera/sqoop/manager/MySQLFreeFormQueryTest.java
Wed Apr 4 06:58:52 2012
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop.manager;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.TestFreeFormQueryImport;
+
+/**
+ * Test free form query import with the MySQL db.
+ */
+public class MySQLFreeFormQueryTest extends TestFreeFormQueryImport {
+
+ public static final Log LOG = LogFactory.getLog(
+ MySQLFreeFormQueryTest.class.getName());
+
+ @Override
+ protected Log getLogger() {
+ return LOG;
+ }
+
+ @Override
+ protected boolean useHsqldbTestServer() {
+ return false;
+ }
+
+ @Override
+ protected String getConnectString() {
+ return MySQLTestUtils.CONNECT_STRING;
+ }
+
+ @Override
+ protected SqoopOptions getSqoopOptions(Configuration conf) {
+ SqoopOptions opts = new SqoopOptions(conf);
+ opts.setUsername(MySQLTestUtils.getCurrentUser());
+ return opts;
+ }
+
+ @Override
+ protected void dropTableIfExists(String table) throws SQLException {
+ Connection conn = getManager().getConnection();
+ PreparedStatement statement = conn.prepareStatement(
+ "DROP TABLE IF EXISTS " + table,
+ ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+ try {
+ statement.executeUpdate();
+ conn.commit();
+ } finally {
+ statement.close();
+ }
+ }
+}
Added:
sqoop/trunk/src/test/com/cloudera/sqoop/manager/OracleFreeFormQueryTest.java
URL:
http://svn.apache.org/viewvc/sqoop/trunk/src/test/com/cloudera/sqoop/manager/OracleFreeFormQueryTest.java?rev=1309268&view=auto
==============================================================================
---
sqoop/trunk/src/test/com/cloudera/sqoop/manager/OracleFreeFormQueryTest.java
(added)
+++
sqoop/trunk/src/test/com/cloudera/sqoop/manager/OracleFreeFormQueryTest.java
Wed Apr 4 06:58:52 2012
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop.manager;
+
+import java.sql.SQLException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.TestFreeFormQueryImport;
+
+/**
+ * Test free form query import with the Oracle db.
+ */
+public class OracleFreeFormQueryTest extends TestFreeFormQueryImport {
+
+ public static final Log LOG = LogFactory.getLog(
+ OracleFreeFormQueryTest.class.getName());
+
+ @Override
+ protected boolean useHsqldbTestServer() {
+ return false;
+ }
+
+ @Override
+ protected String getConnectString() {
+ return OracleUtils.CONNECT_STRING;
+ }
+
+ @Override
+ protected SqoopOptions getSqoopOptions(Configuration conf) {
+ SqoopOptions opts = new SqoopOptions(conf);
+ OracleUtils.setOracleAuth(opts);
+ return opts;
+ }
+
+ @Override
+ protected void dropTableIfExists(String table) throws SQLException {
+ OracleUtils.dropTable(table, getManager());
+ }
+}
+