http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/util/OracleQueries.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/util/OracleQueries.java
 
b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/util/OracleQueries.java
new file mode 100644
index 0000000..bb199c3
--- /dev/null
+++ 
b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/util/OracleQueries.java
@@ -0,0 +1,1721 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.connector.jdbc.oracle.util;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
+import java.math.BigDecimal;
+import java.security.InvalidParameterException;
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.log4j.Logger;
+import org.apache.sqoop.connector.jdbc.oracle.OracleJdbcConnectorConstants;
+import org.apache.sqoop.schema.type.Column;
+import org.joda.time.DateTimeZone;
+
+/**
+ * Contains the queries to get data dictionary information from Oracle 
database.
+ */
+public final class OracleQueries {
+
+  private static final Logger LOG =
+      Logger.getLogger(OracleQueries.class);
+
+  private static Class<?> oracleConnectionClass;
+  private static Class<?> oracleStatementClass;
+  private static Class<?> oracleResultSetClass;
+  private static Class<?> oracleTypesClass;
+  private static Class<?> oracleDateClass;
+  private static Method methSetLongAtName;
+  private static Method methSetBigDecimalAtName;
+  private static Method methSetStringAtName;
+  private static Method methSetTimestampAtName;
+  private static Method methSetBinaryDoubleAtName;
+  private static Method methSetObjectAtName;
+  private static Method methSetBinaryFloatAtName;
+  private static Method methSetIntAtName;
+
+  private static final Map<String, Integer> ORACLE_TYPES =
+      new HashMap<String, Integer>();
+
+  static {
+    try {
+      oracleStatementClass =
+          Class.forName("oracle.jdbc.OraclePreparedStatement");
+      methSetLongAtName =
+          oracleStatementClass.getMethod("setLongAtName", String.class,
+              long.class);
+      methSetBigDecimalAtName =
+          oracleStatementClass.getMethod("setBigDecimalAtName", String.class,
+              BigDecimal.class);
+      methSetStringAtName =
+          oracleStatementClass.getMethod("setStringAtName", String.class,
+              String.class);
+      methSetTimestampAtName =
+          oracleStatementClass.getMethod("setTimestampAtName", String.class,
+              Timestamp.class);
+      methSetBinaryDoubleAtName =
+          oracleStatementClass.getMethod("setBinaryDoubleAtName", String.class,
+              double.class);
+      methSetObjectAtName =
+          oracleStatementClass.getMethod("setObjectAtName", String.class,
+              Object.class);
+      methSetBinaryFloatAtName =
+          oracleStatementClass.getMethod("setBinaryFloatAtName", String.class,
+              float.class);
+      methSetIntAtName =
+          oracleStatementClass.getMethod("setIntAtName", String.class,
+              int.class);
+
+      oracleResultSetClass = Class.forName("oracle.jdbc.OracleResultSet");
+      oracleDateClass = Class.forName("oracle.sql.DATE");
+      oracleConnectionClass = Class.forName("oracle.jdbc.OracleConnection");
+      oracleTypesClass = Class.forName("oracle.jdbc.OracleTypes");
+    } catch (Exception e) {
+      throw new RuntimeException(
+          "Problem getting Oracle JDBC methods via reflection.", e);
+    }
+  }
+
+  private OracleQueries() {
+  }
+
+  public static void setJdbcFetchSize(Connection connection,
+      Integer fetchSize) {
+    int fetchSizeInt =
+        OracleJdbcConnectorConstants.ORACLE_ROW_FETCH_SIZE_DEFAULT;
+    if(fetchSize != null && fetchSize.intValue() > 0) {
+      fetchSizeInt = fetchSize.intValue();
+    }
+    try {
+      Method methSetPrefetch =
+          oracleConnectionClass.getMethod("setDefaultRowPrefetch", int.class);
+      methSetPrefetch.invoke(connection, fetchSizeInt);
+
+      String msg =
+          "The Oracle connection has had its default row fetch size set to : "
+              + fetchSizeInt;
+      if (fetchSizeInt ==
+          OracleJdbcConnectorConstants.ORACLE_ROW_FETCH_SIZE_DEFAULT) {
+        LOG.debug(msg);
+      } else {
+        LOG.info(msg);
+      }
+    } catch (Exception ex) {
+      LOG.warn(
+          String
+              .format(
+                  "Unable to configure the DefaultRowPrefetch of the "
+                + "Oracle connection in %s.",
+                  OracleUtilities.getCurrentMethodName()), ex);
+    }
+
+  }
+
+  public static void setConnectionTimeZone(Connection connection,
+      String timeZone) {
+    String timeZoneStr = timeZone;
+    if(StringUtils.isEmpty(timeZoneStr)) {
+      timeZoneStr = "GMT";
+    }
+    TimeZone timeZoneObj = TimeZone.getTimeZone(timeZoneStr);
+    try {
+      Method methSession =
+          oracleConnectionClass.getMethod("setSessionTimeZone", String.class);
+      Method methDefault =
+          oracleConnectionClass.getMethod("setDefaultTimeZone", 
TimeZone.class);
+      methSession.invoke(connection, timeZoneObj.getID());
+      methDefault.invoke(connection, timeZoneObj);
+      TimeZone.setDefault(timeZoneObj);
+      DateTimeZone.setDefault(DateTimeZone.forTimeZone(timeZoneObj));
+      LOG.info("Session Time Zone set to " + timeZoneObj.getID());
+    } catch (Exception e) {
+      LOG.error("Error setting time zone: " + e.getMessage());
+    }
+  }
+
+  public static OracleTablePartitions getPartitions(Connection connection,
+      OracleTable table) throws SQLException {
+
+    OracleTablePartitions result = new OracleTablePartitions();
+
+    PreparedStatement statement =
+      connection
+        .prepareStatement("with"
+        + " partitions as"
+        + "   (select table_owner, table_name, partition_name"
+        +  "   from dba_tab_partitions"
+        + "   where"
+        + "   table_owner = ? and"
+        + "   table_name = ?),"
+        + " subpartitions as"
+        + "  (select table_owner, table_name, partition_name, 
subpartition_name"
+        + "  from dba_tab_subpartitions"
+        + "  where"
+        + "  table_owner = ? and"
+        + "  table_name = ?)"
+        + " select"
+        + "   partitions.partition_name,"
+        + "   subpartitions.subpartition_name"
+        + " from partitions left outer join subpartitions on"
+        + " (partitions.table_owner = subpartitions.table_owner"
+        + " and partitions.table_name = subpartitions.table_name"
+        + " and partitions.partition_name = subpartitions.partition_name)"
+        + " order by partition_name, subpartition_name");
+
+    statement.setString(1, table.getSchema());
+    statement.setString(2, table.getName());
+    statement.setString(3, table.getSchema());
+    statement.setString(4, table.getName());
+
+    ResultSet resultSet = statement.executeQuery();
+
+    OracleTablePartition partition = null;
+    while (resultSet.next()) {
+      String partitionName = resultSet.getString("partition_name");
+      String subPartitionName = resultSet.getString("subpartition_name");
+
+      if (subPartitionName != null &&  !("".equals(subPartitionName))) {
+        partition = new OracleTablePartition(subPartitionName, true);
+        result.add(partition);
+      } else {
+        if (partition == null || partition.isSubPartition()
+            || partition.getName() != partitionName) {
+          partition = new OracleTablePartition(partitionName, false);
+          result.add(partition);
+        }
+      }
+    }
+
+    resultSet.close();
+    statement.close();
+
+    return result;
+  }
+
+//  public static int getOracleDataObjectNumber(Connection connection,
+//      OracleTable table) throws SQLException {
+//
+//    PreparedStatement statement =
+//        connection.prepareStatement("SELECT data_object_id "
+//            + " FROM dba_objects" + " WHERE owner = ?" + " and object_name = 
?"
+//            + " and object_type = ?");
+//    statement.setString(1, table.getSchema());
+//    statement.setString(2, table.getName());
+//    statement.setString(3, "TABLE");
+//
+//    ResultSet resultSet = statement.executeQuery();
+//
+//    resultSet.next();
+//    int result = resultSet.getInt("data_object_id");
+//
+//    resultSet.close();
+//    statement.close();
+//
+//    return result;
+//  }
+//
+  private static String getPartitionBindVars(List<String> partitionList) {
+    String result = "";
+    for (int i = 1; i <= partitionList.size(); i++) {
+      result += (i > 1) ? "," : "";
+      result += ":part" + i;
+    }
+    return result;
+  }
+
+  private static void bindPartitionBindVars(PreparedStatement statement,
+      List<String> partitionList) throws SQLException {
+    int i = 0;
+    for (String partition : partitionList) {
+      i++;
+      OracleQueries.setStringAtName(statement, "part" + i, partition);
+    }
+  }
+
+  public static List<OracleDataChunkPartition>
+      getOracleDataChunksPartition(Connection connection, OracleTable table,
+          List<String> partitionList) throws SQLException {
+    List<OracleDataChunkPartition> result =
+        new ArrayList<OracleDataChunkPartition>();
+    String sql =
+        "SELECT "
+          + "  pl.partition_name, "
+          + "  pl.is_subpartition, "
+          + "  s.blocks "
+          + "FROM "
+          + "  (SELECT tp.table_owner, "
+          + "    tp.table_name, "
+          + "    NVL(tsp.subpartition_name,tp.partition_name) partition_name, "
+          + "    nvl2(tsp.subpartition_name,1,0) is_subpartition "
+          + "  FROM dba_tab_partitions tp, "
+          + "    dba_tab_subpartitions tsp "
+          + "  WHERE tp.table_owner     = :table_owner"
+          + "  AND tp.table_name        = :table_name"
+          + "  AND tsp.table_owner(+)   =tp.table_owner "
+          + "  AND tsp.table_name(+)    =tp.table_name "
+          + "  AND tsp.partition_name(+)=tp.partition_name ";
+
+    if (partitionList != null && partitionList.size() > 0) {
+      sql +=
+          " AND tp.partition_name IN (" + getPartitionBindVars(partitionList)
+              + ") ";
+    }
+
+    sql += "  ) pl, dba_tables t, dba_segments s "
+        + "WHERE t.owner=pl.table_owner "
+        + "AND t.table_name=pl.table_name "
+        + "AND ( "
+        + "     (t.iot_type='IOT' AND (s.owner,s.segment_name)= "
+        + "                          (SELECT c.index_owner,c.index_name "
+        + "                             FROM dba_constraints c "
+        + "                            WHERE c.owner=pl.table_owner "
+        + "                              AND c.table_name=pl.table_name "
+        + "                              AND c.constraint_type='P')) "
+        + "     OR (t.iot_type IS NULL "
+        + "         AND s.owner=t.owner "
+        + "         AND s.segment_name=t.table_name) "
+        + "    ) "
+        + "AND s.partition_name=pl.partition_name";
+
+    PreparedStatement statement = connection.prepareStatement(sql);
+    OracleQueries.setStringAtName(statement, "table_owner", table
+        .getSchema());
+    OracleQueries.setStringAtName(statement, "table_name", table
+        .getName());
+
+    if (partitionList != null && partitionList.size() > 0) {
+      bindPartitionBindVars(statement, partitionList);
+    }
+
+    LOG.debug(String.format("%s SQL Query =\n%s", OracleUtilities
+        .getCurrentMethodName(), sql.replace(":table_owner", table.getSchema())
+        .replace(":table_name", table.getName())));
+
+    ResultSet resultSet = statement.executeQuery();
+
+    while (resultSet.next()) {
+      OracleDataChunkPartition dataChunk =
+          new OracleDataChunkPartition(resultSet
+              .getString("partition_name"), resultSet
+              .getBoolean("is_subpartition"), resultSet.getLong("blocks"));
+      result.add(dataChunk);
+    }
+    resultSet.close();
+    statement.close();
+    return result;
+  }
+
+  public static List<OracleDataChunkExtent> getOracleDataChunksExtent(
+      Connection connection, OracleTable table,
+      List<String> partitionList, long numberOfChunksPerOracleDataFile)
+      throws SQLException {
+
+    List<OracleDataChunkExtent> result =
+        new ArrayList<OracleDataChunkExtent>();
+
+    String sql =
+        "SELECT data_object_id, "
+          + "file_id, "
+          + "relative_fno, "
+          + "file_batch, "
+          + "MIN (start_block_id) start_block_id, "
+          + "MAX (end_block_id) end_block_id, "
+          + "SUM (blocks) blocks "
+          + "FROM (SELECT o.data_object_id, "
+          + "e.file_id, "
+          + "e.relative_fno, "
+          + "e.block_id start_block_id, "
+          + "e.block_id + e.blocks - 1 end_block_id, "
+          + "e.blocks, "
+          + "CEIL ( "
+          + "   SUM ( "
+          + "      e.blocks) "
+          + "   OVER (PARTITION BY o.data_object_id, e.file_id "
+          + "         ORDER BY e.block_id ASC) "
+          + "   / (SUM (e.blocks) "
+          + "         OVER (PARTITION BY o.data_object_id, e.file_id) "
+          + "      / :numchunks)) "
+          + "   file_batch "
+          + "FROM dba_extents e, dba_objects o, dba_tab_subpartitions tsp "
+          + "WHERE     o.owner = :owner "
+          + "AND o.object_name = :object_name "
+          + "AND e.owner = :owner "
+          + "AND e.segment_name = :object_name "
+          + "AND o.owner = e.owner "
+          + "AND o.object_name = e.segment_name "
+          + "AND (o.subobject_name = e.partition_name "
+          + "     OR (o.subobject_name IS NULL AND e.partition_name IS NULL)) "
+          + "AND o.owner = tsp.table_owner(+) "
+          + "AND o.object_name = tsp.table_name(+) "
+          + "AND o.subobject_name = tsp.subpartition_name(+) ";
+
+    if (partitionList != null && partitionList.size() > 0) {
+      sql +=
+          " AND case when o.object_type='TABLE SUBPARTITION' then "
+          + "tsp.partition_name else o.subobject_name end IN ("
+              + getPartitionBindVars(partitionList) + ") ";
+    }
+
+    sql +=
+        ") " + "GROUP BY data_object_id, " + "         file_id, "
+            + "         relative_fno, " + "         file_batch "
+            + "ORDER BY data_object_id, " + "         file_id, "
+            + "         relative_fno, " + "         file_batch";
+
+    PreparedStatement statement = connection.prepareStatement(sql);
+    OracleQueries.setLongAtName(statement, "numchunks",
+        numberOfChunksPerOracleDataFile);
+    OracleQueries.setStringAtName(statement, "owner", table.getSchema());
+    OracleQueries.setStringAtName(statement, "object_name", table
+        .getName());
+
+    if (partitionList != null && partitionList.size() > 0) {
+      bindPartitionBindVars(statement, partitionList);
+    }
+
+    LOG.debug(String.format("%s SQL Query =\n%s", OracleUtilities
+        .getCurrentMethodName(), sql.replace(":numchunks",
+        Long.toString(numberOfChunksPerOracleDataFile)).replace(":owner",
+        table.getSchema()).replace(":object_name", table.getName())));
+
+    ResultSet resultSet = statement.executeQuery();
+
+    while (resultSet.next()) {
+      int fileId = resultSet.getInt("relative_fno");
+      int fileBatch = resultSet.getInt("file_batch");
+      String dataChunkId =
+          OracleUtilities.generateDataChunkId(fileId, fileBatch);
+      OracleDataChunkExtent dataChunk =
+          new OracleDataChunkExtent(dataChunkId, resultSet
+              .getInt("data_object_id"), resultSet.getInt("relative_fno"),
+              resultSet.getLong("start_block_id"), resultSet
+                  .getLong("end_block_id"));
+      result.add(dataChunk);
+    }
+
+    resultSet.close();
+    statement.close();
+
+    return result;
+  }
+
+//  private static void trace(String message) {
+//
+//    LOG.debug(message);
+//  }
+//
+  public static String getOracleObjectType(Connection connection,
+      OracleTable table) throws SQLException {
+
+    PreparedStatement statement =
+        connection.prepareStatement("SELECT object_type " + " FROM dba_objects"
+            + " WHERE owner = ?" + " and object_name = ?");
+    statement.setString(1, table.getSchema());
+    statement.setString(2, table.getName());
+
+    ResultSet resultSet = statement.executeQuery();
+
+    String result = null;
+    if (resultSet.next()) {
+      result = resultSet.getString("object_type");
+    }
+
+    resultSet.close();
+    statement.close();
+
+    return result;
+  }
+
+  public static OracleVersion getOracleVersion(Connection connection)
+      throws SQLException {
+
+    String sql =
+        "SELECT \n"
+      + "  v.banner, \n"
+      + "  rtrim(v.version)      full_version, \n"
+      + "  rtrim(v.version_bit) version_bit, \n"
+      + "  SUBSTR(v.version, 1, INSTR(v.version, '.', 1, 1)-1) major, \n"
+      + "  SUBSTR(v.version, INSTR(v.version, '.', 1, 1) + 1, "
+      + "  INSTR(v.version, '.', 1, 2) - INSTR(v.version, '.', 1, 1) - 1) "
+      + "    minor, \n"
+      + "  SUBSTR(v.version, INSTR(v.version, '.', 1, 2) + 1, "
+      + "  INSTR(v.version, '.', 1, 3) - INSTR(v.version, '.', 1, 2) - 1) "
+      + "    version, \n"
+      + "  SUBSTR(v.version, INSTR(v.version, '.', 1, 3) + 1, "
+      + "  INSTR(v.version, '.', 1, 4) - INSTR(v.version, '.', 1, 3) - 1) "
+      + "    patch, \n"
+      + "  DECODE(instr(v.banner, '64bit'), 0, 'False', 'True') isDb64bit, \n"
+      + "  DECODE(instr(b.banner, 'HPUX'), 0, 'False', 'True') isHPUX \n"
+      + "FROM (SELECT rownum row_num, \n"
+      + "   banner,\n"
+      + "   SUBSTR(SUBSTR(banner,INSTR(banner,'Release ')+8), 1) 
version_bit,\n"
+      + "   SUBSTR(SUBSTR(banner,INSTR(banner,'Release ')+8), 1,\n"
+      + "    INSTR(SUBSTR(banner,INSTR(banner,'Release ')+8),' ')) version\n"
+      + "FROM v$version\n" + "  WHERE banner LIKE 'Oracle%'\n"
+      + "     OR banner LIKE 'Personal Oracle%') v,\n" + "v$version b\n"
+      + "  WHERE v.row_num = 1\n" + "  and b.banner like 'TNS%'\n";
+
+    Statement statement = connection.createStatement();
+    ResultSet resultSet = statement.executeQuery(sql);
+    resultSet.next();
+    OracleVersion result =
+        new OracleVersion(resultSet.getInt("major"), resultSet.getInt("minor"),
+            resultSet.getInt("version"), resultSet.getInt("patch"), resultSet
+                .getString("banner"));
+
+    resultSet.close();
+    statement.close();
+
+    return result;
+  }
+
+  public static List<OracleTable> getTables(Connection connection)
+      throws SQLException {
+
+    return getTables(connection, null, null, TableNameQueryType.Equals);
+  }
+
+  private enum GetTablesOptions {
+    Owner, Table
+  }
+
+  private enum TableNameQueryType {
+    Equals, Like
+  }
+
+//  public static List<OracleTable>
+//      getTables(Connection connection, String owner) throws SQLException {
+//
+//    return getTables(connection, owner, null, TableNameQueryType.Equals);
+//  }
+
+  public static OracleTable getTable(Connection connection, String owner,
+      String tableName) throws SQLException {
+
+    List<OracleTable> tables =
+        getTables(connection, owner, tableName, TableNameQueryType.Equals);
+    if (tables.size() > 0) {
+      return tables.get(0);
+    }
+
+    return null;
+  }
+
+  public static List<OracleTable> getTablesWithTableNameLike(
+      Connection connection, String owner, String tableNameLike)
+      throws SQLException {
+
+    return getTables(connection, owner, tableNameLike, 
TableNameQueryType.Like);
+  }
+
+  private static List<OracleTable> getTables(Connection connection,
+      String owner, String tableName, TableNameQueryType tableNameQueryType)
+      throws SQLException {
+
+    EnumSet<GetTablesOptions> options = EnumSet.noneOf(GetTablesOptions.class);
+
+    if (owner != null && !owner.isEmpty()) {
+      options.add(GetTablesOptions.Owner);
+    }
+
+    if (tableName != null && !tableName.isEmpty()) {
+      options.add(GetTablesOptions.Table);
+    }
+
+    String sql =
+        "SELECT owner, table_name " + " FROM dba_tables" + " %s %s %s %s "
+            + " ORDER BY owner, table_name";
+
+    String tableComparitor = null;
+    switch (tableNameQueryType) {
+      case Equals:
+        tableComparitor = "=";
+        break;
+      case Like:
+        tableComparitor = "LIKE";
+        break;
+      default:
+        throw new RuntimeException("Operator not implemented.");
+    }
+
+    sql =
+        String.format(sql, options.isEmpty() ? "" : "WHERE", options
+            .contains(GetTablesOptions.Owner) ? "owner = ?" : "", options
+            .containsAll(EnumSet.of(GetTablesOptions.Owner,
+                GetTablesOptions.Table)) ? "AND" : "", options
+            .contains(GetTablesOptions.Table) ? String.format(
+            "table_name %s ?", tableComparitor) : "");
+
+    PreparedStatement statement = connection.prepareStatement(sql);
+
+    if (options.containsAll(EnumSet.of(GetTablesOptions.Owner,
+        GetTablesOptions.Table))) {
+      statement.setString(1, owner);
+      statement.setString(2, tableName);
+    } else {
+      if (options.contains(GetTablesOptions.Owner)) {
+        statement.setString(1, owner);
+      } else if (options.contains(GetTablesOptions.Table)) {
+        statement.setString(1, tableName);
+      }
+    }
+
+    ResultSet resultSet = statement.executeQuery();
+
+    ArrayList<OracleTable> result = new ArrayList<OracleTable>();
+    while (resultSet.next()) {
+      result.add(new OracleTable(resultSet.getString("owner"), resultSet
+          .getString("table_name")));
+    }
+
+    resultSet.close();
+    statement.close();
+
+    return result;
+  }
+
+  public static List<String> getTableColumnNames(Connection connection,
+      OracleTable table) throws SQLException {
+
+    OracleTableColumns oracleTableColumns = getTableColumns(connection, table);
+    List<String> result = new ArrayList<String>(oracleTableColumns.size());
+
+    for (int idx = 0; idx < oracleTableColumns.size(); idx++) {
+      result.add(oracleTableColumns.get(idx).getName());
+    }
+
+    return result;
+  }
+
+  public static List<String> getToTableColumnNames(Connection connection,
+      OracleTable table, boolean onlyOraOopSupportedTypes,
+      boolean omitOraOopPseudoColumns) throws SQLException {
+
+    OracleTableColumns oracleTableColumns =
+        getToTableColumns(connection, table, onlyOraOopSupportedTypes,
+            omitOraOopPseudoColumns);
+
+    List<String> result = new ArrayList<String>(oracleTableColumns.size());
+
+    for (int idx = 0; idx < oracleTableColumns.size(); idx++) {
+      result.add(oracleTableColumns.get(idx).getName());
+    }
+
+    return result;
+
+  }
+
+  public static List<String> getFromTableColumnNames(Connection connection,
+      OracleTable table, boolean omitLobAndLongColumnsDuringImport,
+      boolean onlyOraOopSupportedTypes) throws SQLException {
+
+    OracleTableColumns oracleTableColumns =
+        getFromTableColumns(connection, table,
+            omitLobAndLongColumnsDuringImport, onlyOraOopSupportedTypes);
+
+    List<String> result = new ArrayList<String>(oracleTableColumns.size());
+
+    for (int idx = 0; idx < oracleTableColumns.size(); idx++) {
+      result.add(oracleTableColumns.get(idx).getName());
+    }
+
+    return result;
+
+  }
+
+  private static OracleTableColumns getTableColumns(Connection connection,
+      OracleTable table, boolean omitLobColumns, String dataTypesClause,
+      HashSet<String> columnNamesToOmit) throws SQLException {
+
+    String sql =
+        "SELECT column_name, data_type " + " FROM dba_tab_columns"
+            + " WHERE owner = ?" + " and table_name = ?" + " %s"
+            + " ORDER BY column_id";
+
+    sql =
+        String.format(sql, dataTypesClause == null ? "" : " and "
+            + dataTypesClause);
+
+    LOG.debug(String.format("%s : sql = \n%s", OracleUtilities
+        .getCurrentMethodName(), sql));
+
+    OracleTableColumns result = new OracleTableColumns();
+    PreparedStatement statement = connection.prepareStatement(sql);
+    statement.setString(1, getTableSchema(connection, table));
+    statement.setString(2, table.getName());
+
+    ResultSet resultSet = statement.executeQuery();
+
+    while (resultSet.next()) {
+
+      String columnName = resultSet.getString("column_name");
+
+      if (columnNamesToOmit != null) {
+        if (columnNamesToOmit.contains(columnName)) {
+          continue;
+        }
+      }
+
+      result.add(new OracleTableColumn(columnName, resultSet
+          .getString("data_type")));
+    }
+
+    resultSet.close();
+    statement.close();
+
+    // Now get the actual JDBC data-types for these columns...
+    StringBuilder columnList = new StringBuilder();
+    for (int idx = 0; idx < result.size(); idx++) {
+      if (idx > 0) {
+        columnList.append(",");
+      }
+      columnList.append(result.get(idx).getName());
+    }
+    sql =
+        String.format("SELECT %s FROM %s WHERE 0=1", columnList.toString(),
+            table.toString());
+    Statement statementDesc = connection.createStatement();
+    ResultSet resultSetDesc = statementDesc.executeQuery(sql);
+    ResultSetMetaData metaData = resultSetDesc.getMetaData();
+    for (int idx = 0; idx < metaData.getColumnCount(); idx++) {
+      result.get(idx).setOracleType(metaData.getColumnType(idx + 1)); // <- 
JDBC
+                                                                    // is
+                                                                    // 1-based
+    }
+    resultSetDesc.close();
+    statementDesc.close();
+
+    return result;
+  }
+
+  public static OracleTableColumns getTableColumns(Connection connection,
+      OracleTable table) throws SQLException {
+
+    return getTableColumns(connection, table, false, null // <- dataTypesClause
+        , null); // <-columnNamesToOmit
+  }
+
+  public static OracleTableColumns getToTableColumns(Connection connection,
+      OracleTable table, boolean onlyOraOopSupportedTypes,
+      boolean omitOraOopPseudoColumns) throws SQLException {
+    String dataTypesClause = "";
+    HashSet<String> columnNamesToOmit = null;
+    if (onlyOraOopSupportedTypes) {
+      dataTypesClause = OracleJdbcConnectorConstants.
+          SUPPORTED_EXPORT_ORACLE_DATA_TYPES_CLAUSE;
+    }
+    if (omitOraOopPseudoColumns) {
+      if (columnNamesToOmit == null) {
+        columnNamesToOmit = new HashSet<String>();
+      }
+      columnNamesToOmit.add(OracleJdbcConnectorConstants.
+          COLUMN_NAME_EXPORT_PARTITION);
+      columnNamesToOmit.add(OracleJdbcConnectorConstants.
+          COLUMN_NAME_EXPORT_SUBPARTITION);
+      columnNamesToOmit.add(OracleJdbcConnectorConstants.
+          COLUMN_NAME_EXPORT_MAPPER_ROW);
+    }
+    return getTableColumns(connection, table,
+        false, dataTypesClause, columnNamesToOmit);
+  }
+
+  public static OracleTableColumns getFromTableColumns(Connection connection,
+      OracleTable table, boolean omitLobAndLongColumnsDuringImport,
+      boolean onlyOraOopSupportedTypes) throws SQLException {
+    String dataTypesClause = "";
+    HashSet<String> columnNamesToOmit = null;
+    if (onlyOraOopSupportedTypes) {
+      dataTypesClause = OracleJdbcConnectorConstants.
+          SUPPORTED_IMPORT_ORACLE_DATA_TYPES_CLAUSE;
+
+      if (omitLobAndLongColumnsDuringImport) {
+        LOG.info("LOB and LONG columns are being omitted from the Import.");
+        dataTypesClause =
+            " DATA_TYPE not in ('BLOB', 'CLOB', 'NCLOB', 'LONG') and "
+                + dataTypesClause;
+      }
+    }
+    return getTableColumns(connection, table,
+        omitLobAndLongColumnsDuringImport, dataTypesClause, columnNamesToOmit);
+  }
+
+  public static List<OracleActiveInstance> getOracleActiveInstances(
+      Connection connection) throws SQLException {
+
+    // Returns null if there are no rows in v$active_instances - which 
indicates
+    // this Oracle database is not a RAC.
+    ArrayList<OracleActiveInstance> result = null;
+
+    Statement statement = connection.createStatement();
+    ResultSet resultSet =
+        statement.executeQuery("select inst_name from v$active_instances ");
+
+    while (resultSet.next()) {
+      String instName = resultSet.getString("inst_name");
+      String[] nameFragments = instName.split(":");
+
+      if (nameFragments.length != 2) {
+        throw new SQLException(
+            "Parsing Error: The inst_name column of v$active_instances does "
+            + "not contain two values separated by a colon.");
+      }
+
+      String hostName = nameFragments[0].trim();
+      String instanceName = nameFragments[1].trim();
+
+      if (hostName.isEmpty()) {
+        throw new SQLException(
+            "Parsing Error: The inst_name column of v$active_instances does "
+            + "not include a host name.");
+      }
+
+      if (instanceName.isEmpty()) {
+        throw new SQLException(
+            "Parsing Error: The inst_name column of v$active_instances does "
+            + "not include an instance name.");
+      }
+
+      OracleActiveInstance instance = new OracleActiveInstance();
+      instance.setHostName(hostName);
+      instance.setInstanceName(instanceName);
+
+      if (result == null) {
+        result = new ArrayList<OracleActiveInstance>();
+      }
+
+      result.add(instance);
+    }
+
+    resultSet.close();
+    statement.close();
+    return result;
+  }
+
+  public static String getCurrentOracleInstanceName(Connection connection)
+      throws SQLException {
+
+    String result = "";
+
+    Statement statement = connection.createStatement();
+    ResultSet resultSet =
+        statement.executeQuery("select instance_name from v$instance");
+
+    if (resultSet.next()) {
+      result = resultSet.getString("instance_name");
+    }
+
+    resultSet.close();
+    statement.close();
+    return result;
+  }
+
+  public static Object getSysDate(Connection connection) throws SQLException {
+    Statement statement = connection.createStatement();
+    ResultSet resultSet = statement.executeQuery("select sysdate from dual");
+
+    resultSet.next();
+    try {
+      Method method = oracleResultSetClass.getMethod("getDATE", int.class);
+      return method.invoke(resultSet, 1);
+    } catch (Exception e) {
+      if (e.getCause() instanceof SQLException) {
+        throw (SQLException) e.getCause();
+      } else {
+        throw new RuntimeException("Could not get sysdate", e);
+      }
+    } finally {
+      resultSet.close();
+      statement.close();
+    }
+  }
+
+  public static String oraDATEToString(Object date, String format) {
+    try {
+      Method dateMethod =
+          oracleDateClass.getMethod("toText", String.class, String.class);
+      return (String) dateMethod.invoke(date, format, null);
+    } catch (Exception e) {
+      throw new RuntimeException(String.format(
+          "Unable to convert the oracle.sql.DATE value \"%s\" to text.", date
+              .toString()), e);
+    }
+  }
+
+  public static Object oraDATEFromString(String date, String format) {
+    try {
+      Method dateMethod =
+          oracleDateClass.getMethod("fromText", String.class, String.class,
+              String.class);
+      return dateMethod.invoke(null, date, format, null);
+    } catch (Exception e) {
+      throw new RuntimeException(String
+          .format(
+              "Unable to convert the String value \"%s\" to oracle.sql.DATE.",
+              date), e);
+    }
+  }
+
+  public static Date oraDATEToDate(Object date) {
+    try {
+      Method dateMethod = oracleDateClass.getMethod("dateValue");
+      return (Date) dateMethod.invoke(date);
+    } catch (Exception e) {
+      throw new RuntimeException("Could not get sysdate", e);
+    }
+  }
+
+  public static String getSysTimeStamp(Connection connection)
+      throws SQLException {
+
+    Statement statement = connection.createStatement();
+    ResultSet resultSet =
+        statement.executeQuery("select systimestamp from dual");
+
+    resultSet.next();
+
+    try {
+      Method method = oracleResultSetClass.getMethod("getTIMESTAMP", 
int.class);
+      Object timestamp = method.invoke(resultSet, 1);
+      return timestamp.toString();
+    } catch (Exception e) {
+      if (e.getCause() instanceof SQLException) {
+        throw (SQLException) e.getCause();
+      } else {
+        throw new RuntimeException("Could not get sysdate", e);
+      }
+    } finally {
+      resultSet.close();
+      statement.close();
+    }
+  }
+
+  public static boolean isTableAnIndexOrganizedTable(Connection connection,
+      OracleTable table) throws SQLException {
+
+    /*
+     * http://ss64.com/orad/DBA_TABLES.html IOT_TYPE: If index-only table,then
+     * IOT_TYPE is IOT or IOT_OVERFLOW or IOT_MAPPING else NULL
+     */
+
+    boolean result = false;
+
+    PreparedStatement statement =
+        connection.prepareStatement("select iot_type " + "from dba_tables "
+            + "where owner = ? " + "and table_name = ?");
+    statement.setString(1, table.getSchema());
+    statement.setString(2, table.getName());
+    ResultSet resultSet = statement.executeQuery();
+
+    if (resultSet.next()) {
+      String iotType = resultSet.getString("iot_type");
+      result = iotType != null && !iotType.isEmpty();
+    }
+
+    resultSet.close();
+    statement.close();
+
+    return result;
+  }
+
+  public static void dropTable(Connection connection, OracleTable table)
+      throws SQLException {
+
+    String sql = String.format("DROP TABLE %s", table.toString());
+
+    Statement statement = connection.createStatement();
+    try {
+      statement.execute(sql);
+    } catch (SQLException ex) {
+      if (ex.getErrorCode() != 942) { // ORA-00942: table or view does not 
exist
+        throw ex;
+      }
+    }
+    statement.close();
+  }
+
+  public static void
+      exchangeSubpartition(Connection connection, OracleTable table,
+          String subPartitionName, OracleTable subPartitionTable)
+          throws SQLException {
+
+    Statement statement = connection.createStatement();
+    String sql =
+        String.format("ALTER TABLE %s EXCHANGE SUBPARTITION %s WITH TABLE %s",
+            table.toString(), subPartitionName, subPartitionTable.toString());
+    statement.execute(sql);
+    statement.close();
+  }
+
+  public static void createExportTableFromTemplate(Connection connection,
+      OracleTable newTable, String tableStorageClause,
+      OracleTable templateTable, boolean noLogging) throws SQLException {
+
+    String sql =
+        String.format("CREATE TABLE %s \n" + "%s %s \n" + "AS \n"
+            + "(SELECT * FROM %s WHERE 0=1)", newTable.toString(),
+            noLogging ? "NOLOGGING" : "", tableStorageClause, templateTable
+                .toString());
+
+    Statement statement = connection.createStatement();
+    statement.execute(sql);
+    statement.close();
+  }
+
+  private static Object oraDATEAddJulianDays(Object date, int julianDay,
+      int julianSec) {
+    try {
+      Constructor<?> dateCon = oracleDateClass.getConstructor(byte[].class);
+      Method dateBytes = oracleDateClass.getMethod("toBytes");
+      Object result = dateCon.newInstance(dateBytes.invoke(date));
+      Method dateAdd =
+          oracleDateClass.getMethod("addJulianDays", int.class, int.class);
+      result = dateAdd.invoke(result, julianDay, julianSec);
+      return result;
+    } catch (Exception e) {
+      throw new RuntimeException("Could not add days to date.", e);
+    }
+  }
+
+  public static void createExportTableFromTemplateWithPartitioning(
+      Connection connection, OracleTable newTable, String tableStorageClause,
+      OracleTable templateTable, boolean noLogging, String partitionName,
+      Object jobDateTime, int numberOfMappers, String[] subPartitionNames)
+      throws SQLException {
+
+    String dateFormat = "yyyy-mm-dd hh24:mi:ss";
+
+    Object partitionBound =
+        OracleQueries.oraDATEAddJulianDays(jobDateTime, 0, 1);
+
+    String partitionBoundStr =
+        OracleQueries.oraDATEToString(partitionBound, dateFormat);
+
+    StringBuilder subPartitions = new StringBuilder();
+    for (int idx = 0; idx < numberOfMappers; idx++) {
+      if (idx > 0) {
+        subPartitions.append(",");
+      }
+
+      subPartitions.append(String.format(" SUBPARTITION %s VALUES (%d)",
+          subPartitionNames[idx], idx));
+    }
+
+    String sql =
+        String.format(
+            "CREATE TABLE %s \n" + "%s %s \n" + "PARTITION BY RANGE (%s) \n"
+                + "SUBPARTITION BY LIST (%s) \n" + "(PARTITION %s \n"
+                + "VALUES LESS THAN (to_date('%s', '%s')) \n" + "( %s ) \n"
+                + ") \n" + "AS \n"
+                + "(SELECT t.*, sysdate %s, 0 %s, 0 %s FROM %s t \n"
+                + "WHERE 0=1)", newTable.toString(), noLogging ? "NOLOGGING"
+                : "", tableStorageClause,
+            OracleJdbcConnectorConstants.COLUMN_NAME_EXPORT_PARTITION,
+            OracleJdbcConnectorConstants.COLUMN_NAME_EXPORT_SUBPARTITION,
+            partitionName, partitionBoundStr, dateFormat,
+            subPartitions.toString(),
+            OracleJdbcConnectorConstants.COLUMN_NAME_EXPORT_PARTITION,
+            OracleJdbcConnectorConstants.COLUMN_NAME_EXPORT_SUBPARTITION,
+            OracleJdbcConnectorConstants.COLUMN_NAME_EXPORT_MAPPER_ROW,
+            templateTable.toString());
+
+    LOG.debug(String.format("SQL generated by %s:\n%s", OracleUtilities
+        .getCurrentMethodName(), sql));
+
+    try {
+
+      // Create the main export table...
+      PreparedStatement preparedStatement = connection.prepareStatement(sql);
+      preparedStatement.execute(sql);
+      preparedStatement.close();
+    } catch (SQLException ex) {
+      LOG.error(String
+          .format(
+              "The error \"%s\" was encountered when executing the following "
+              + "SQL statement:\n%s",
+              ex.getMessage(), sql));
+      throw ex;
+    }
+  }
+
+  public static void createExportTableForMapper(Connection connection,
+      OracleTable table, String tableStorageClause, OracleTable templateTable,
+      boolean addOraOopPartitionColumns) throws SQLException {
+
+    String sql = "";
+    try {
+
+      // Create the N tables to be used by the mappers...
+      Statement statement = connection.createStatement();
+      if (addOraOopPartitionColumns) {
+        sql =
+            String.format("CREATE TABLE %s \n" + "NOLOGGING %s \n" + "AS \n"
+                + "(SELECT t.*, SYSDATE %s, 0 %s, 0 %s FROM %s t WHERE 0=1)",
+                table.toString(), tableStorageClause,
+                OracleJdbcConnectorConstants.COLUMN_NAME_EXPORT_PARTITION,
+                OracleJdbcConnectorConstants.COLUMN_NAME_EXPORT_SUBPARTITION,
+                OracleJdbcConnectorConstants.COLUMN_NAME_EXPORT_MAPPER_ROW,
+                templateTable.toString());
+      } else {
+        sql =
+            String.format("CREATE TABLE %s \n" + "NOLOGGING %s \n" + "AS \n"
+                + "(SELECT * FROM %s WHERE 0=1)", table.toString(),
+                tableStorageClause, templateTable.toString());
+      }
+
+      LOG.info(String.format("SQL generated by %s:\n%s", OracleUtilities
+          .getCurrentMethodName(), sql));
+
+      statement.execute(sql);
+      statement.close();
+    } catch (SQLException ex) {
+      LOG.error(String
+          .format(
+              "The error \"%s\" was encountered when executing the following "
+              + "SQL statement:\n%s",
+              ex.getMessage(), sql));
+      throw ex;
+    }
+  }
+
+  public static void createMoreExportTablePartitions(Connection connection,
+      OracleTable table, String partitionName, Object jobDateTime,
+      String[] subPartitionNames) throws SQLException {
+
+    String dateFormat = "yyyy-mm-dd hh24:mi:ss";
+
+    Object partitionBound =
+        OracleQueries.oraDATEAddJulianDays(jobDateTime, 0, 1);
+    String partitionBoundStr =
+        OracleQueries.oraDATEToString(partitionBound, dateFormat);
+
+    StringBuilder subPartitions = new StringBuilder();
+    for (int idx = 0; idx < subPartitionNames.length; idx++) {
+      if (idx > 0) {
+        subPartitions.append(",");
+      }
+
+      subPartitions.append(String.format(" SUBPARTITION %s VALUES (%d)",
+          subPartitionNames[idx], idx));
+    }
+
+    String sql =
+        String.format("ALTER TABLE %s " + "ADD PARTITION %s "
+            + "VALUES LESS THAN (to_date('%s', '%s'))" + "( %s ) ", table
+            .toString(), partitionName, partitionBoundStr, dateFormat,
+            subPartitions.toString());
+
+    LOG.debug(String.format("SQL generated by %s:\n%s", OracleUtilities
+        .getCurrentMethodName(), sql));
+
+    try {
+      PreparedStatement preparedStatement = connection.prepareStatement(sql);
+      preparedStatement.execute(sql);
+      preparedStatement.close();
+
+    } catch (SQLException ex) {
+      LOG.error(String
+          .format(
+              "The error \"%s\" was encountered when executing the following "
+              + "SQL statement:\n%s",
+              ex.getMessage(), sql));
+      throw ex;
+    }
+  }
+
+  public static void mergeTable(Connection connection, OracleTable targetTable,
+      OracleTable sourceTable, String[] mergeColumnNames,
+      OracleTableColumns oracleTableColumns, Object oraOopSysDate,
+      int oraOopMapperId, boolean parallelizationEnabled) throws SQLException {
+
+    StringBuilder updateClause = new StringBuilder();
+    StringBuilder insertClause = new StringBuilder();
+    StringBuilder valuesClause = new StringBuilder();
+    for (int idx = 0; idx < oracleTableColumns.size(); idx++) {
+      OracleTableColumn oracleTableColumn = oracleTableColumns.get(idx);
+      String columnName = oracleTableColumn.getName();
+
+      if (insertClause.length() > 0) {
+        insertClause.append(",");
+      }
+      insertClause.append(String.format("target.%s", columnName));
+
+      if (valuesClause.length() > 0) {
+        valuesClause.append(",");
+      }
+      valuesClause.append(String.format("source.%s", columnName));
+
+      if (!OracleUtilities.stringArrayContains(mergeColumnNames, columnName,
+          true)) {
+
+        // If we're performing a merge, then the table is not partitioned. (If
+        // the table
+        // was partitioned, we'd be deleting and then inserting rows.)
+        if (!columnName.equalsIgnoreCase(OracleJdbcConnectorConstants.
+            COLUMN_NAME_EXPORT_PARTITION)
+            && !columnName.equalsIgnoreCase(OracleJdbcConnectorConstants.
+                COLUMN_NAME_EXPORT_SUBPARTITION)
+            && !columnName.equalsIgnoreCase(OracleJdbcConnectorConstants.
+                COLUMN_NAME_EXPORT_MAPPER_ROW)) {
+
+          if (updateClause.length() > 0) {
+            updateClause.append(",");
+          }
+          updateClause.append(String.format("target.%1$s = source.%1$s",
+              columnName));
+
+        }
+      }
+    }
+
+    String sourceClause = valuesClause.toString();
+
+    String sql =
+        String.format("MERGE %7$s INTO %1$s target \n"
+            + "USING (SELECT %8$s * FROM %2$s) source \n" + "  ON (%3$s) \n"
+            + "WHEN MATCHED THEN \n" + "  UPDATE SET %4$s \n"
+            + "WHEN NOT MATCHED THEN \n" + "  INSERT (%5$s) \n"
+            + "  VALUES (%6$s)", targetTable.toString(),
+            sourceTable.toString(),
+            generateUpdateKeyColumnsWhereClauseFragment(mergeColumnNames,
+                "target", "source"), updateClause.toString(), insertClause
+                .toString(), sourceClause,
+            parallelizationEnabled ? "/*+ append parallel(target) */" : "",
+            parallelizationEnabled ? "/*+parallel*/" : "");
+
+    LOG.info(String.format("Merge SQL statement:\n" + sql));
+
+    Statement statement = connection.createStatement();
+    ResultSet resultSet = statement.executeQuery(sql);
+    resultSet.close();
+    statement.close();
+  }
+
+  public static void updateTable(Connection connection,
+      OracleTable targetTable, OracleTable sourceTable,
+      String[] mergeColumnNames, OracleTableColumns oracleTableColumns,
+      Object oraOopSysDate, int oraOopMapperId, boolean parallelizationEnabled)
+      throws SQLException {
+
+    StringBuilder targetColumnsClause = new StringBuilder();
+    StringBuilder sourceColumnsClause = new StringBuilder();
+    for (int idx = 0; idx < oracleTableColumns.size(); idx++) {
+      OracleTableColumn oracleTableColumn = oracleTableColumns.get(idx);
+      String columnName = oracleTableColumn.getName();
+
+      if (targetColumnsClause.length() > 0) {
+        targetColumnsClause.append(",");
+      }
+      targetColumnsClause.append(String.format("a.%s", columnName));
+
+      if (sourceColumnsClause.length() > 0) {
+        sourceColumnsClause.append(",");
+      }
+      sourceColumnsClause.append(String.format("b.%s", columnName));
+    }
+
+    String sourceClause = sourceColumnsClause.toString();
+
+    sourceClause =
+        sourceClause.replaceAll(OracleJdbcConnectorConstants.
+            COLUMN_NAME_EXPORT_PARTITION,
+            String.format("to_date('%s', 'yyyy/mm/dd hh24:mi:ss')",
+                OracleQueries.oraDATEToString(oraOopSysDate,
+                    "yyyy/mm/dd hh24:mi:ss")));
+
+    sourceClause =
+        sourceClause.replaceAll(
+            OracleJdbcConnectorConstants.COLUMN_NAME_EXPORT_SUBPARTITION,
+            Integer.toString(oraOopMapperId));
+
+    String sql =
+        String.format("UPDATE %5$s %1$s a \n" + "SET \n" + "(%2$s) \n"
+            + "= (SELECT \n" + "%3$s \n" + "FROM %4$s b \n" + "WHERE %6$s) \n"
+            + "WHERE EXISTS (SELECT null FROM %4$s c " + "WHERE %7$s)",
+            targetTable.toString(), targetColumnsClause.toString(),
+            sourceClause, sourceTable.toString(),
+            parallelizationEnabled ? "/*+ parallel */" : "",
+            generateUpdateKeyColumnsWhereClauseFragment(mergeColumnNames, "b",
+                "a"), generateUpdateKeyColumnsWhereClauseFragment(
+                mergeColumnNames, "c", "a"));
+
+    LOG.info(String.format("Update SQL statement:\n" + sql));
+
+    Statement statement = connection.createStatement();
+    int rowsAffected = statement.executeUpdate(sql);
+
+    LOG.info(String.format(
+        "The number of rows affected by the update SQL was: %d", 
rowsAffected));
+
+    statement.close();
+  }
+
+  /**
+   * Whether new rows should be included in changes table or not.
+   */
+  public enum CreateExportChangesTableOptions {
+    OnlyRowsThatDiffer, RowsThatDifferPlusNewRows
+  }
+
+  public static int createExportChangesTable(Connection connection,
+      OracleTable tableToCreate, String tableToCreateStorageClause,
+      OracleTable tableContainingUpdates, OracleTable tableToBeUpdated,
+      String[] joinColumnNames, CreateExportChangesTableOptions options,
+      boolean parallelizationEnabled) throws SQLException {
+
+    List<String> columnNames =
+        getToTableColumnNames(connection, tableToBeUpdated
+            , true // <- onlyOraOopSupportedTypes
+            , false // <- omitOraOopPseudoColumns
+        );
+
+    StringBuilder columnClause = new StringBuilder(2 * columnNames.size());
+    for (int idx = 0; idx < columnNames.size(); idx++) {
+      if (idx > 0) {
+        columnClause.append(",");
+      }
+      columnClause.append("a." + columnNames.get(idx));
+    }
+
+    StringBuilder rowEqualityClause = new StringBuilder();
+    for (int idx = 0; idx < columnNames.size(); idx++) {
+      String columnName = columnNames.get(idx);
+
+      // We need to omit the OraOop pseudo columns from the SQL statement that
+      // compares the data in
+      // the two tables we're interested in. Otherwise, EVERY row will be
+      // considered to be changed,
+      // since the values in the pseudo columns will differ. (i.e.
+      // ORAOOP_EXPORT_SYSDATE will differ.)
+      if (columnName.equalsIgnoreCase(OracleJdbcConnectorConstants.
+              COLUMN_NAME_EXPORT_PARTITION)
+          || columnName.equalsIgnoreCase(OracleJdbcConnectorConstants.
+                 COLUMN_NAME_EXPORT_SUBPARTITION)
+          || columnName.equalsIgnoreCase(OracleJdbcConnectorConstants.
+                 COLUMN_NAME_EXPORT_MAPPER_ROW)) {
+        continue;
+      }
+
+      if (idx > 0) {
+        rowEqualityClause.append("OR");
+      }
+
+      rowEqualityClause.append(String.format("(a.%1$s <> b.%1$s "
+          + "OR (a.%1$s IS NULL AND b.%1$s IS NOT NULL) "
+          + "OR (a.%1$s IS NOT NULL AND b.%1$s IS NULL))", columnName));
+    }
+
+    String sqlJoin = null;
+    switch (options) {
+
+      case OnlyRowsThatDiffer:
+        sqlJoin = "";
+        break;
+
+      case RowsThatDifferPlusNewRows:
+        sqlJoin = "(+)"; // <- An outer-join will cause the "new" rows to be
+                         // included
+        break;
+
+      default:
+        throw new RuntimeException(String.format(
+            "Update %s to cater for the option \"%s\".", OracleUtilities
+                .getCurrentMethodName(), options.toString()));
+    }
+
+    String sql =
+        String.format("CREATE TABLE %1$s \n" + "NOLOGGING %8$s \n" + "%7$s \n"
+            + "AS \n " + "SELECT \n" + "%5$s \n" + "FROM %2$s a, %3$s b \n"
+            + "WHERE (%4$s) \n" + "AND ( \n" + "%6$s \n" + ")", tableToCreate
+            .toString(), tableContainingUpdates.toString(), tableToBeUpdated
+            .toString(), generateUpdateKeyColumnsWhereClauseFragment(
+            joinColumnNames, "a", "b", sqlJoin), columnClause.toString(),
+            rowEqualityClause.toString(), parallelizationEnabled ? "PARALLEL"
+                : "", tableToCreateStorageClause);
+
+    LOG.info(String.format("The SQL to create the changes-table is:\n%s", 
sql));
+
+    Statement statement = connection.createStatement();
+
+    long start = System.nanoTime();
+    statement.executeUpdate(sql);
+    double timeInSec = (System.nanoTime() - start) / Math.pow(10, 9);
+    LOG.info(String.format("Time spent creating change-table: %f sec.",
+        timeInSec));
+
+    String indexName = tableToCreate.toString().replaceAll("CHG", "IDX");
+    start = System.nanoTime();
+    statement.execute(String.format("CREATE INDEX %s ON %s (%s)", indexName,
+        tableToCreate.toString(), OracleUtilities
+            .stringArrayToCSV(joinColumnNames)));
+    timeInSec = (System.nanoTime() - start) / Math.pow(10, 9);
+    LOG.info(String.format("Time spent creating change-table index: %f sec.",
+        timeInSec));
+
+    int changeTableRowCount = 0;
+
+    ResultSet resultSet =
+        statement.executeQuery(String.format("select count(*) from %s",
+            tableToCreate.toString()));
+    resultSet.next();
+    changeTableRowCount = resultSet.getInt(1);
+    LOG.info(String.format("The number of rows in the change-table is: %d",
+        changeTableRowCount));
+
+    statement.close();
+    return changeTableRowCount;
+  }
+
+  public static void deleteRowsFromTable(Connection connection,
+      OracleTable tableToDeleteRowsFrom,
+      OracleTable tableContainingRowsToDelete, String[] joinColumnNames,
+      boolean parallelizationEnabled) throws SQLException {
+
+    String sql =
+        String.format("DELETE %4$s FROM %1$s a \n" + "WHERE EXISTS ( \n"
+            + "SELECT null FROM %3$s b WHERE \n" + "%2$s)",
+            tableToDeleteRowsFrom.toString(),
+            generateUpdateKeyColumnsWhereClauseFragment(joinColumnNames, "a",
+                "b"), tableContainingRowsToDelete.toString(),
+            parallelizationEnabled ? "/*+ parallel */" : "");
+
+    LOG.info(String.format("The SQL to delete rows from a table:\n%s", sql));
+
+    Statement statement = connection.createStatement();
+    int rowsAffected = statement.executeUpdate(sql);
+
+    LOG.info(String.format(
+        "The number of rows affected by the delete SQL was: %d", 
rowsAffected));
+
+    statement.close();
+  }
+
+  public static void insertRowsIntoExportTable(Connection connection,
+      OracleTable tableToInsertRowsInto,
+      OracleTable tableContainingRowsToInsert, Object oraOopSysDate,
+      int oraOopMapperId, boolean parallelizationEnabled) throws SQLException {
+
+    List<String> columnNames =
+        getTableColumnNames(connection, tableToInsertRowsInto);
+
+    StringBuilder columnClause =
+        new StringBuilder(2 + (2 * columnNames.size()));
+    for (int idx = 0; idx < columnNames.size(); idx++) {
+      if (idx > 0) {
+        columnClause.append(",");
+      }
+      columnClause.append(columnNames.get(idx));
+    }
+
+    String columnsClause = columnClause.toString();
+
+    String sql =
+        String.format("insert %4$s \n" + "into %1$s \n" + "select \n"
+            + "%2$s \n" + "from %3$s", tableToInsertRowsInto.toString(),
+            columnsClause, tableContainingRowsToInsert.toString(),
+            parallelizationEnabled ? "/*+ append parallel */" : "");
+
+    LOG.info(String.format(
+        "The SQL to insert rows from one table into another:\n%s", sql));
+
+    Statement statement = connection.createStatement();
+    ResultSet resultSet = statement.executeQuery(sql);
+    resultSet.close();
+    statement.close();
+  }
+
+  public static boolean doesIndexOnColumnsExist(Connection connection,
+      OracleTable oracleTable, String[] columnNames) throws SQLException {
+
+    // Attempts to find an index on the table that *starts* with the N column
+    // names passed.
+    // These columns can be in any order.
+
+    String columnNamesInClause =
+        OracleUtilities.stringArrayToCSV(columnNames, "'");
+
+    String sql =
+        String.format("SELECT b.index_name, \n"
+            + "  sum(case when b.column_name in (%1$s) then 1 end) num_cols \n"
+            + "FROM dba_indexes a, dba_ind_columns b \n" + "WHERE \n"
+            + "a.owner = b.index_owner \n"
+            + "AND a.index_name = b.index_name \n" + "AND b.table_owner = ? \n"
+            + "AND b.table_name = ? \n" + "AND a.status = 'VALID' \n"
+            + "AND b.column_position <= ? \n" + "GROUP BY b.index_name \n"
+            + "HAVING sum(case when b.column_name in (%1$s) then 1 end) = ?",
+            columnNamesInClause);
+
+    PreparedStatement statement = connection.prepareStatement(sql);
+    statement.setString(1, oracleTable.getSchema());
+    statement.setString(2, oracleTable.getName());
+    statement.setInt(3, columnNames.length);
+    statement.setInt(4, columnNames.length);
+
+    LOG.debug(String.format("SQL to find an index on the columns %s:\n%s",
+        columnNamesInClause, sql));
+
+    ResultSet resultSet = statement.executeQuery();
+
+    boolean result = false;
+    if (resultSet.next()) {
+      LOG.debug(String
+          .format(
+              "The table %s has an index named %s starting with the column(s) "
+              + "%s (in any order).",
+              oracleTable.toString(), resultSet.getString("index_name"),
+              columnNamesInClause));
+      result = true;
+    }
+
+    resultSet.close();
+    statement.close();
+
+    return result;
+  }
+
+  private static String generateUpdateKeyColumnsWhereClauseFragment(
+      String[] joinColumnNames, String prefix1, String prefix2) {
+
+    return generateUpdateKeyColumnsWhereClauseFragment(joinColumnNames,
+        prefix1, prefix2, "");
+  }
+
+  private static String generateUpdateKeyColumnsWhereClauseFragment(
+      String[] joinColumnNames, String prefix1, String prefix2,
+      String sqlJoinOperator) {
+
+    StringBuilder result = new StringBuilder();
+    for (int idx = 0; idx < joinColumnNames.length; idx++) {
+      String joinColumnName = joinColumnNames[idx];
+      if (idx > 0) {
+        result.append(" AND ");
+      }
+      result.append(String.format("%1$s.%3$s = %2$s.%3$s %4$s", prefix1,
+          prefix2, joinColumnName, sqlJoinOperator));
+    }
+    return result.toString();
+  }
+
+  public static String getCurrentSchema(Connection connection)
+      throws SQLException {
+    String sql = "SELECT SYS_CONTEXT('USERENV','CURRENT_SCHEMA') FROM DUAL";
+
+    PreparedStatement statement = connection.prepareStatement(sql);
+
+    ResultSet resultSet = statement.executeQuery();
+
+    resultSet.next();
+    String result = resultSet.getString(1);
+
+    resultSet.close();
+    statement.close();
+
+    LOG.info("Current schema is: " + result);
+
+    return result;
+  }
+
+  public static String getTableSchema(Connection connection, OracleTable table)
+      throws SQLException {
+    if (table.getSchema() == null || table.getSchema().isEmpty()) {
+      return getCurrentSchema(connection);
+    } else {
+      return table.getSchema();
+    }
+  }
+
+  public static long getCurrentScn(Connection connection) throws SQLException {
+    String sql = "SELECT current_scn FROM v$database";
+    PreparedStatement statement = connection.prepareStatement(sql);
+    ResultSet resultSet = statement.executeQuery();
+
+    resultSet.next();
+    long result = resultSet.getLong(1);
+    resultSet.close();
+    statement.close();
+
+    return result;
+  }
+
+  public static void setLongAtName(PreparedStatement statement,
+      String bindName, long bindValue) throws SQLException {
+    try {
+      methSetLongAtName.invoke(statement, bindName, bindValue);
+    } catch (Exception e) {
+      if (e.getCause() instanceof SQLException) {
+        throw (SQLException) e.getCause();
+      } else {
+        throw new RuntimeException("Could not set bind variable", e);
+      }
+    }
+  }
+
+  public static void setBigDecimalAtName(PreparedStatement statement,
+      String bindName, BigDecimal bindValue) throws SQLException {
+    try {
+      methSetBigDecimalAtName.invoke(statement, bindName, bindValue);
+    } catch (Exception e) {
+      if (e.getCause() instanceof SQLException) {
+        throw (SQLException) e.getCause();
+      } else {
+        throw new RuntimeException("Could not set bind variable", e);
+      }
+    }
+  }
+
+  public static void setStringAtName(PreparedStatement statement,
+      String bindName, String bindValue) throws SQLException {
+    try {
+      methSetStringAtName.invoke(statement, bindName, bindValue);
+    } catch (Exception e) {
+      if (e.getCause() instanceof SQLException) {
+        throw (SQLException) e.getCause();
+      } else {
+        throw new RuntimeException("Could not set bind variable", e);
+      }
+    }
+  }
+
+  public static void setTimestampAtName(PreparedStatement statement,
+      String bindName, Timestamp bindValue) throws SQLException {
+    try {
+      methSetTimestampAtName.invoke(statement, bindName, bindValue);
+    } catch (Exception e) {
+      if (e.getCause() instanceof SQLException) {
+        throw (SQLException) e.getCause();
+      } else {
+        throw new RuntimeException("Could not set bind variable", e);
+      }
+    }
+  }
+
+  public static void setBinaryDoubleAtName(PreparedStatement statement,
+      String bindName, double bindValue) throws SQLException {
+    try {
+      methSetBinaryDoubleAtName.invoke(statement, bindName, bindValue);
+    } catch (Exception e) {
+      if (e.getCause() instanceof SQLException) {
+        throw (SQLException) e.getCause();
+      } else {
+        throw new RuntimeException("Could not set bind variable", e);
+      }
+    }
+  }
+
+  public static void setObjectAtName(PreparedStatement statement,
+      String bindName, Object bindValue) throws SQLException {
+    try {
+      methSetObjectAtName.invoke(statement, bindName, bindValue);
+    } catch (Exception e) {
+      if (e.getCause() instanceof SQLException) {
+        throw (SQLException) e.getCause();
+      } else {
+        throw new RuntimeException("Could not set bind variable", e);
+      }
+    }
+  }
+
+  public static void setBinaryFloatAtName(PreparedStatement statement,
+      String bindName, float bindValue) throws SQLException {
+    try {
+      methSetBinaryFloatAtName.invoke(statement, bindName, bindValue);
+    } catch (Exception e) {
+      if (e.getCause() instanceof SQLException) {
+        throw (SQLException) e.getCause();
+      } else {
+        throw new RuntimeException("Could not set bind variable", e);
+      }
+    }
+  }
+
+//  public static void setIntAtName(PreparedStatement statement, String 
bindName,
+//      int bindValue) throws SQLException {
+//    try {
+//      methSetIntAtName.invoke(statement, bindName, bindValue);
+//    } catch (Exception e) {
+//      if (e.getCause() instanceof SQLException) {
+//        throw (SQLException) e.getCause();
+//      } else {
+//        throw new RuntimeException("Could not set bind variable", e);
+//      }
+//    }
+//  }
+
+  public static int getOracleType(String name) {
+    Integer result = ORACLE_TYPES.get(name);
+    if (result == null) {
+      synchronized (ORACLE_TYPES) {
+        try {
+          result = oracleTypesClass.getField(name).getInt(null);
+          ORACLE_TYPES.put(name, result);
+        } catch (Exception e) {
+          throw new RuntimeException("Invalid oracle type specified", e);
+        }
+      }
+    }
+    return result;
+  }
+
+  public static List<Column> getColDataTypes(Connection connection,
+      OracleTable table, List<String> colNames) throws SQLException {
+    List<Column> result = new ArrayList<Column>();
+    StringBuilder sb = new StringBuilder();
+    sb.append("SELECT ");
+    for (int idx = 0; idx < colNames.size(); idx++) {
+      if (idx > 0) {
+        sb.append(",");
+      }
+      sb.append(colNames.get(idx));
+    }
+    sb.append(String.format(" FROM %s WHERE 0=1", table.toString()));
+
+    String sql = sb.toString();
+    PreparedStatement statement = connection.prepareStatement(sql);
+    try {
+      ResultSetMetaData metadata = statement.getMetaData();
+      int numCols = metadata.getColumnCount();
+      for(int i = 1; i < numCols + 1; i++) {
+        String colName = metadata.getColumnName(i);
+        Column oracleColumn = OracleSqlTypesUtils.sqlTypeToSchemaType(
+            metadata.getColumnType(i), colName,
+            metadata.getPrecision(i), metadata.getScale(i));
+
+        result.add(oracleColumn);
+      }
+    } finally {
+      statement.close();
+    }
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/util/OracleSqlTypesUtils.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/util/OracleSqlTypesUtils.java
 
b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/util/OracleSqlTypesUtils.java
new file mode 100644
index 0000000..e691557
--- /dev/null
+++ 
b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/util/OracleSqlTypesUtils.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc.oracle.util;
+
+import java.sql.Types;
+
+import org.apache.log4j.Logger;
+import org.apache.sqoop.schema.type.Binary;
+import org.apache.sqoop.schema.type.Bit;
+import org.apache.sqoop.schema.type.Column;
+import org.apache.sqoop.schema.type.Date;
+import org.apache.sqoop.schema.type.DateTime;
+import org.apache.sqoop.schema.type.Decimal;
+import org.apache.sqoop.schema.type.FixedPoint;
+import org.apache.sqoop.schema.type.FloatingPoint;
+import org.apache.sqoop.schema.type.Text;
+import org.apache.sqoop.schema.type.Time;
+import org.apache.sqoop.schema.type.Unknown;
+
+public class OracleSqlTypesUtils {
+
+  private static final Logger LOG =
+      Logger.getLogger(OracleSqlTypesUtils.class);
+
+  public static Column sqlTypeToSchemaType(int sqlType, String columnName,
+      int precision, int scale) {
+    Column result = null;
+    switch (sqlType) {
+      case Types.SMALLINT:
+      case Types.TINYINT:
+        // only supports signed values
+        result = new FixedPoint(columnName, 2L, true);
+        break;
+      case Types.INTEGER:
+        // only supports signed values
+        result = new FixedPoint(columnName, 4L, true);
+        break;
+      case Types.BIGINT:
+        result = new FixedPoint(columnName, 8L, true);
+        break;
+
+      case Types.CLOB:
+      case Types.VARCHAR:
+      case Types.CHAR:
+      case Types.LONGVARCHAR:
+      case Types.NVARCHAR:
+      case Types.NCHAR:
+      case Types.LONGNVARCHAR:
+        result = new Text(columnName);
+        break;
+
+      case Types.DATE:
+        result = new Date(columnName);
+        break;
+
+      case Types.TIME:
+        result = new Time(columnName, true);
+        break;
+
+      case Types.TIMESTAMP:
+        result = new DateTime(columnName, true, false);
+        break;
+
+      case Types.FLOAT:
+      case Types.REAL:
+        result = new FloatingPoint(columnName, 4L);
+        break;
+      case Types.DOUBLE:
+        result = new FloatingPoint(columnName, 8L);
+        break;
+
+      case Types.NUMERIC:
+      case Types.DECIMAL:
+        result = new Decimal(columnName, precision, scale);
+        break;
+
+      case Types.BIT:
+      case Types.BOOLEAN:
+        result = new Bit(columnName);
+        break;
+
+      case Types.BINARY:
+      case Types.VARBINARY:
+      case Types.BLOB:
+      case Types.LONGVARBINARY:
+        result = new Binary(columnName);
+        break;
+
+      default:
+        result = new Unknown(columnName,(long)sqlType);
+    }
+
+    if (sqlType == OracleQueries.getOracleType("TIMESTAMP")) {
+      result = new DateTime(columnName, true, false);
+    }
+
+    if (sqlType == OracleQueries.getOracleType("TIMESTAMPTZ")) {
+      result = new DateTime(columnName, true, true);
+    }
+
+    if (sqlType == OracleQueries.getOracleType("TIMESTAMPLTZ")) {
+      result = new DateTime(columnName, true, true);
+    }
+
+    /*
+     * http://www.oracle.com/technology/sample_code/tech/java/sqlj_jdbc/files
+     * /oracle10g/ieee/Readme.html
+     *
+     * BINARY_DOUBLE is a 64-bit, double-precision floating-point number
+     * datatype. (IEEE 754) Each BINARY_DOUBLE value requires 9 bytes, 
including
+     * a length byte. A 64-bit double format number X is divided as sign s 
1-bit
+     * exponent e 11-bits fraction f 52-bits
+     *
+     * BINARY_FLOAT is a 32-bit, single-precision floating-point number
+     * datatype. (IEEE 754) Each BINARY_FLOAT value requires 5 bytes, including
+     * a length byte. A 32-bit single format number X is divided as sign s 
1-bit
+     * exponent e 8-bits fraction f 23-bits
+     */
+    if (sqlType == OracleQueries.getOracleType("BINARY_FLOAT")) {
+      // http://people.uncw.edu/tompkinsj/133/numbers/Reals.htm
+      result = new FloatingPoint(columnName, 4L);
+    }
+
+    if (sqlType == OracleQueries.getOracleType("BINARY_DOUBLE")) {
+      // http://people.uncw.edu/tompkinsj/133/numbers/Reals.htm
+      result = new FloatingPoint(columnName, 8L);
+    }
+
+    if (sqlType == OracleQueries.getOracleType("STRUCT")) {
+      // E.g. URITYPE
+      result = new Text(columnName);
+    }
+
+    if (result == null || result instanceof Unknown) {
+
+      // For constant values, refer to:
+      // http://oracleadvisor.com/documentation/oracle/database/11.2/
+      //   appdev.112/e13995/constant-values.html#oracle_jdbc
+
+      if (sqlType == OracleQueries.getOracleType("BFILE")
+          || sqlType == OracleQueries.getOracleType("NCLOB")
+          || sqlType == OracleQueries.getOracleType("NCHAR")
+          || sqlType == OracleQueries.getOracleType("NVARCHAR")
+          || sqlType == OracleQueries.getOracleType("ROWID")
+          || sqlType == OracleQueries.getOracleType("INTERVALYM")
+          || sqlType == OracleQueries.getOracleType("INTERVALDS")
+          || sqlType == OracleQueries.getOracleType("OTHER")) {
+        result = new Text(columnName);
+      }
+
+    }
+
+    if (result == null || result instanceof Unknown) {
+      LOG.warn(String.format("%s should be updated to cater for data-type: %d",
+          OracleUtilities.getCurrentMethodName(), sqlType));
+    }
+
+    return result;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/util/OracleTable.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/util/OracleTable.java
 
b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/util/OracleTable.java
new file mode 100644
index 0000000..972e393
--- /dev/null
+++ 
b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/util/OracleTable.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.connector.jdbc.oracle.util;
+
+/**
+ * Contains details about an Oracle table.
+ */
+public class OracleTable {
+
+  private String schema;
+  private String name;
+
+  public String getSchema() {
+    return schema;
+  }
+
+  private void setSchema(String newSchema) {
+    this.schema = newSchema;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  private void setName(String newName) {
+    this.name = newName;
+  }
+
+  public OracleTable() {
+
+  }
+
+  public OracleTable(String schema, String name) {
+
+    setSchema(schema);
+    setName(name);
+  }
+
+  public OracleTable(String name) {
+    setName(name);
+  }
+
+  @Override
+  public String toString() {
+    String result =
+        (getSchema() == null || getSchema().isEmpty()) ? "" : "\""
+            + getSchema() + "\".";
+    result += "\"" + getName() + "\"";
+    return result;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/util/OracleTableColumn.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/util/OracleTableColumn.java
 
b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/util/OracleTableColumn.java
new file mode 100644
index 0000000..ac6fb8b
--- /dev/null
+++ 
b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/util/OracleTableColumn.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.connector.jdbc.oracle.util;
+
+/**
+ * Contains details about a column in an Oracle table.
+ */
+public class OracleTableColumn {
+
+  private String name;
+  private String dataType; // <- i.e. The data_type from dba_tab_columns
+  private int oracleType;
+
+  public OracleTableColumn(String name, String dataType) {
+
+    this.setName(name);
+    this.setDataType(dataType);
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public void setName(String newName) {
+    this.name = newName;
+  }
+
+  public String getDataType() {
+    return dataType;
+  }
+
+  public void setDataType(String newDataType) {
+    this.dataType = newDataType;
+  }
+
+  public int getOracleType() {
+    return oracleType;
+  }
+
+  public void setOracleType(int newOracleType) {
+    this.oracleType = newOracleType;
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/util/OracleTableColumns.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/util/OracleTableColumns.java
 
b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/util/OracleTableColumns.java
new file mode 100644
index 0000000..61fad5d
--- /dev/null
+++ 
b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/util/OracleTableColumns.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.connector.jdbc.oracle.util;
+
+import java.util.Iterator;
+
+/**
+ * Contains a list of Oracle columns.
+ */
+public class OracleTableColumns extends
+    OracleGenerics.ObjectList<OracleTableColumn> {
+
+  public OracleTableColumn findColumnByName(String columnName) {
+
+    OracleTableColumn result;
+
+    Iterator<OracleTableColumn> iterator = this.iterator();
+    while (iterator.hasNext()) {
+      result = iterator.next();
+      if (result.getName().equals(columnName)) {
+        return result;
+      }
+    }
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/util/OracleTablePartition.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/util/OracleTablePartition.java
 
b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/util/OracleTablePartition.java
new file mode 100644
index 0000000..43e8396
--- /dev/null
+++ 
b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/util/OracleTablePartition.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.connector.jdbc.oracle.util;
+
+/**
+ * Contains details about a partition for an Oracle table.
+ */
+public class OracleTablePartition {
+
+  private String name;
+  private boolean isSubPartition;
+
+  public OracleTablePartition(String name, boolean isSubPartition) {
+    this.setName(name);
+    this.setSubPartition(isSubPartition);
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public void setName(String newName) {
+    this.name = newName;
+  }
+
+  public boolean isSubPartition() {
+    return isSubPartition;
+  }
+
+  public void setSubPartition(boolean newIsSubPartition) {
+    this.isSubPartition = newIsSubPartition;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/util/OracleTablePartitions.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/util/OracleTablePartitions.java
 
b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/util/OracleTablePartitions.java
new file mode 100644
index 0000000..6140d7b
--- /dev/null
+++ 
b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/util/OracleTablePartitions.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.connector.jdbc.oracle.util;
+
+import java.util.Iterator;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Contains a list of Oracle table partitions.
+ */
+public class OracleTablePartitions extends
+    OracleGenerics.ObjectList<OracleTablePartition> {
+
+  public OracleTablePartition findPartitionByName(String partitionName) {
+
+    OracleTablePartition result;
+
+    Iterator<OracleTablePartition> iterator = this.iterator();
+    while (iterator.hasNext()) {
+      result = iterator.next();
+      if (result.getName().equals(partitionName)) {
+        return result;
+      }
+    }
+    return null;
+  }
+
+  public OracleTablePartition findPartitionByRegEx(String regEx) {
+
+    OracleTablePartition result;
+
+    Pattern pattern = Pattern.compile(regEx);
+
+    Iterator<OracleTablePartition> iterator = this.iterator();
+    while (iterator.hasNext()) {
+      result = iterator.next();
+      Matcher matcher = pattern.matcher(result.getName());
+      if (matcher.find()) {
+        return result;
+      }
+    }
+    return null;
+  }
+
+}

Reply via email to