SQOOP-2595: Add Oracle connector to Sqoop 2 (David Robson via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/fa3c77b6 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/fa3c77b6 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/fa3c77b6 Branch: refs/heads/sqoop2 Commit: fa3c77b6a8352f68ec429164f48aee00ae2480d8 Parents: 2a9ae31 Author: Jarek Jarcec Cecho <[email protected]> Authored: Thu Nov 5 09:40:31 2015 -0800 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Thu Nov 5 09:40:31 2015 -0800 ---------------------------------------------------------------------- connector/connector-oracle-jdbc/pom.xml | 134 ++ .../oracle/OracleJdbcCommonInitializer.java | 477 +++++ .../jdbc/oracle/OracleJdbcConnector.java | 92 + .../oracle/OracleJdbcConnectorConstants.java | 493 +++++ .../oracle/OracleJdbcConnectorUpgrader.java | 43 + .../jdbc/oracle/OracleJdbcExtractor.java | 361 ++++ .../jdbc/oracle/OracleJdbcFromDestroyer.java | 36 + .../jdbc/oracle/OracleJdbcFromInitializer.java | 90 + .../connector/jdbc/oracle/OracleJdbcLoader.java | 615 +++++++ .../jdbc/oracle/OracleJdbcPartition.java | 183 ++ .../jdbc/oracle/OracleJdbcPartitioner.java | 252 +++ .../jdbc/oracle/OracleJdbcToDestroyer.java | 273 +++ .../jdbc/oracle/OracleJdbcToInitializer.java | 498 +++++ .../oracle/configuration/ConnectionConfig.java | 78 + .../oracle/configuration/FromJobConfig.java | 61 + .../configuration/FromJobConfiguration.java | 33 + .../oracle/configuration/LinkConfiguration.java | 34 + .../jdbc/oracle/configuration/ToJobConfig.java | 64 + .../configuration/ToJobConfiguration.java | 33 + .../jdbc/oracle/util/OracleActiveInstance.java | 44 + .../oracle/util/OracleConnectionFactory.java | 246 +++ .../jdbc/oracle/util/OracleDataChunk.java | 48 + .../jdbc/oracle/util/OracleDataChunkExtent.java | 109 ++ .../oracle/util/OracleDataChunkPartition.java | 85 + .../jdbc/oracle/util/OracleGenerics.java | 64 + .../jdbc/oracle/util/OracleJdbcUrl.java | 244 +++ .../jdbc/oracle/util/OracleQueries.java | 1721 ++++++++++++++++++ .../jdbc/oracle/util/OracleSqlTypesUtils.java | 176 ++ .../connector/jdbc/oracle/util/OracleTable.java | 68 + .../jdbc/oracle/util/OracleTableColumn.java | 59 + .../jdbc/oracle/util/OracleTableColumns.java | 43 + .../jdbc/oracle/util/OracleTablePartition.java | 50 + .../jdbc/oracle/util/OracleTablePartitions.java | 62 + .../jdbc/oracle/util/OracleUtilities.java | 1446 +++++++++++++++ .../jdbc/oracle/util/OracleVersion.java | 84 + .../oracle-jdbc-connector-config.properties | 136 ++ .../main/resources/sqoopconnector.properties | 18 + .../jdbc/oracle/TestOracleJdbcPartitioner.java | 102 ++ .../jdbc/oracle/TestOracleJdbcUrl.java | 249 +++ .../connector/jdbc/oracle/TestOracleTable.java | 42 + .../jdbc/oracle/TestOracleUtilities.java | 613 +++++++ .../OracleConnectionFactoryTest.java | 497 +++++ .../oracle/integration/OracleQueriesTest.java | 49 + .../jdbc/oracle/integration/OracleTestCase.java | 41 + connector/pom.xml | 1 + pom.xml | 11 + server/pom.xml | 5 + 47 files changed, 10163 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/pom.xml ---------------------------------------------------------------------- diff --git a/connector/connector-oracle-jdbc/pom.xml b/connector/connector-oracle-jdbc/pom.xml new file mode 100644 index 0000000..325790d --- /dev/null +++ b/connector/connector-oracle-jdbc/pom.xml @@ -0,0 +1,134 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.sqoop</groupId> + <artifactId>connector</artifactId> + <version>2.0.0-SNAPSHOT</version> + </parent> + + <groupId>org.apache.sqoop.connector</groupId> + <artifactId>sqoop-connector-oracle-jdbc</artifactId> + <name>Sqoop Oracle JDBC Connector</name> + + <dependencies> + + <dependency> + <groupId>org.apache.sqoop</groupId> + <artifactId>connector-sdk</artifactId> + </dependency> + + <!-- Test dependencies --> + <dependency> + <groupId>org.testng</groupId> + <artifactId>testng</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.sqoop</groupId> + <artifactId>sqoop-common-test</artifactId> + <scope>test</scope> + </dependency> + + </dependencies> + + <build> + <finalName>sqoop</finalName> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <excludedGroups>oracle</excludedGroups> + + <excludes> + <exclude>**/integration/**</exclude> + </excludes> + </configuration> + <executions> + <execution> + <id>integration-test</id> + <goals> + <goal>test</goal> + </goals> + <phase>integration-test</phase> + <configuration> + <excludes> + <exclude>none</exclude> + </excludes> + <includes> + <include>**/integration/**</include> + </includes> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + + <profiles> + <profile> + <id>jdbc-oracle</id> + + <activation> + <property> + <name>jdbc.oracle</name> + </property> + </activation> + + <dependencies> + <dependency> + <groupId>com.oracle</groupId> + <artifactId>ojdbc14</artifactId> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + + <configuration> + <excludedGroups>none</excludedGroups> + </configuration> + </plugin> + </plugins> + </build> + </profile> + </profiles> + +</project> http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcCommonInitializer.java ---------------------------------------------------------------------- diff --git a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcCommonInitializer.java b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcCommonInitializer.java new file mode 100644 index 0000000..1fd95c0 --- /dev/null +++ b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcCommonInitializer.java @@ -0,0 +1,477 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.jdbc.oracle; + +import java.sql.Connection; +import java.sql.SQLException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.commons.lang.StringUtils; +import org.apache.log4j.Logger; +import org.apache.sqoop.common.MutableContext; +import org.apache.sqoop.connector.jdbc.oracle.configuration.ConnectionConfig; +import org.apache.sqoop.connector.jdbc.oracle.configuration.LinkConfiguration; +import org.apache.sqoop.connector.jdbc.oracle.util.OracleActiveInstance; +import org.apache.sqoop.connector.jdbc.oracle.util.OracleConnectionFactory; +import org.apache.sqoop.connector.jdbc.oracle.util.OracleJdbcUrl; +import org.apache.sqoop.connector.jdbc.oracle.util.OracleQueries; +import org.apache.sqoop.connector.jdbc.oracle.util.OracleTable; +import org.apache.sqoop.connector.jdbc.oracle.util.OracleUtilities; +import org.apache.sqoop.connector.jdbc.oracle.util.OracleUtilities.JdbcOracleThinConnectionParsingError; +import org.apache.sqoop.connector.jdbc.oracle.util.OracleVersion; +import org.apache.sqoop.job.etl.Initializer; +import org.apache.sqoop.job.etl.InitializerContext; +import org.apache.sqoop.schema.Schema; +import org.apache.sqoop.schema.type.Column; + +public class OracleJdbcCommonInitializer<JobConfiguration> extends Initializer<LinkConfiguration, JobConfiguration> { + + private static final Logger LOG = + Logger.getLogger(OracleJdbcCommonInitializer.class); + + protected Connection connection; + protected OracleTable table; + protected int numMappers = 8; + + public void connect(InitializerContext context, + LinkConfiguration linkConfiguration, + JobConfiguration jobConfiguration) throws SQLException { + connection = OracleConnectionFactory.makeConnection( + linkConfiguration.connectionConfig); + } + + @Override + public void initialize(InitializerContext context, + LinkConfiguration linkConfiguration, + JobConfiguration jobConfiguration) { + showUserTheOraOopWelcomeMessage(); + + try { + connect(context, linkConfiguration, jobConfiguration); + } catch (SQLException ex) { + throw new RuntimeException(String.format( + "Unable to connect to the Oracle database at %s\nError:%s", + linkConfiguration.connectionConfig.connectionString, ex + .getMessage()), ex); + } + + // Generate the "action" name that we'll assign to our Oracle sessions + // so that the user knows which Oracle sessions belong to OraOop... + //TODO: Get the job name + context.getContext().setString( + OracleJdbcConnectorConstants.ORACLE_SESSION_ACTION_NAME, + getOracleSessionActionName( + linkConfiguration.connectionConfig.username)); + + //TODO: Don't think this can be done anymore + //OraOopUtilities.appendJavaSecurityEgd(sqoopOptions.getConf()); + + // Get the Oracle database version... + try { + OracleVersion oracleVersion = + OracleQueries.getOracleVersion(connection); + LOG.info(String.format("Oracle Database version: %s", + oracleVersion.getBanner())); + } catch (SQLException ex) { + LOG.error("Unable to obtain the Oracle database version.", ex); + } + + // Generate the JDBC URLs to be used by each mapper... + setMapperConnectionDetails(linkConfiguration.connectionConfig, + context.getContext()); + + // Show the user the Oracle command that can be used to kill this + // OraOop + // job via Oracle... + showUserTheOracleCommandToKillOraOop(context.getContext()); + } + + @Override + public Schema getSchema(InitializerContext context, + LinkConfiguration linkConfiguration, + JobConfiguration jobConfiguration) { + try { + connect(context, linkConfiguration, jobConfiguration); + } catch (SQLException ex) { + throw new RuntimeException(String.format( + "Unable to connect to the Oracle database at %s\n" + + "Error:%s", linkConfiguration.connectionConfig.connectionString, + ex.getMessage()), ex); + } + + Schema schema = new Schema(table.toString()); + + try { + List<String> colNames = OracleQueries.getToTableColumnNames( + connection, table, true, true); + + List<Column> columnTypes = + OracleQueries.getColDataTypes(connection, table, colNames); + + for(Column column : columnTypes) { + schema.addColumn(column); + } + + return schema; + } catch(Exception e) { + throw new RuntimeException( + "Could not determine columns in Oracle Table.", e); + } + } + + private void showUserTheOraOopWelcomeMessage() { + + String msg1 = + String.format("Using %s", + OracleJdbcConnectorConstants.ORACLE_SESSION_MODULE_NAME); + + int longestMessage = msg1.length(); + + msg1 = StringUtils.rightPad(msg1, longestMessage); + + char[] asterisks = new char[longestMessage + 8]; + Arrays.fill(asterisks, '*'); + + String msg = + String.format("\n" + "%1$s\n" + "*** %2$s ***\n" + "%1$s", new String( + asterisks), msg1); + LOG.info(msg); + } + + private String getOracleSessionActionName(String jobName) { + + String timeStr = + (new SimpleDateFormat("yyyyMMddHHmmsszzz")).format(new Date()); + + String result = String.format("%s %s", jobName, timeStr); + + // NOTE: The "action" column of v$session is only a 32 character column. + // Therefore we need to ensure that the string returned by this + // method does not exceed 32 characters... + if (result.length() > 32) { + result = result.substring(0, 32).trim(); + } + + return result; + } + + private void setMapperConnectionDetails(ConnectionConfig connectionConfig, + MutableContext context) { + + // Query v$active_instances to get a list of all instances in the Oracle RAC + // (assuming this *could* be a RAC)... + List<OracleActiveInstance> activeInstances = null; + try { + activeInstances = + OracleQueries.getOracleActiveInstances(connection); + } catch (SQLException ex) { + throw new RuntimeException( + "An error was encountered when attempting to determine the " + + "configuration of the Oracle RAC.", + ex); + } + + if (activeInstances == null) { + LOG.info("This Oracle database is not a RAC."); + } else { + LOG.info("This Oracle database is a RAC."); + } + + // Is dynamic JDBC URL generation disabled?... + if (OracleUtilities.oracleJdbcUrlGenerationDisabled(connectionConfig)) { + LOG.info(String + .format( + "%s will not use dynamically generated JDBC URLs - this feature " + + "has been disabled.", + OracleJdbcConnectorConstants.CONNECTOR_NAME)); + return; + } + + boolean generateRacBasedJdbcUrls = false; + + // Decide whether this is a multi-instance RAC, and whether we need to do + // anything more... + if (activeInstances != null) { + generateRacBasedJdbcUrls = true; + + if (activeInstances.size() < OracleJdbcConnectorConstants. + MIN_NUM_RAC_ACTIVE_INSTANCES_FOR_DYNAMIC_JDBC_URLS) { + LOG.info(String.format( + "There are only %d active instances in the Oracle RAC. " + + "%s will not bother utilizing dynamically generated JDBC URLs.", + activeInstances.size(), + OracleJdbcConnectorConstants.CONNECTOR_NAME)); + generateRacBasedJdbcUrls = false; + } + } + + // E.g. jdbc:oracle:thin:@localhost.localdomain:1521:orcl + String jdbcConnectStr = connectionConfig.connectionString; + + // Parse the JDBC URL to obtain the port number for the TNS listener... + String jdbcHost = ""; + int jdbcPort = 0; + String jdbcSid = ""; + String jdbcService = ""; + String jdbcTnsName = ""; + try { + + OracleJdbcUrl oraOopJdbcUrl = new OracleJdbcUrl(jdbcConnectStr); + OracleUtilities.JdbcOracleThinConnection jdbcConnection = + oraOopJdbcUrl.parseJdbcOracleThinConnectionString(); + jdbcHost = jdbcConnection.getHost(); + jdbcPort = jdbcConnection.getPort(); + jdbcSid = jdbcConnection.getSid(); + jdbcService = jdbcConnection.getService(); + jdbcTnsName = jdbcConnection.getTnsName(); + } catch (JdbcOracleThinConnectionParsingError ex) { + LOG.info(String.format( + "Unable to parse the JDBC connection URL \"%s\" as a connection " + + "that uses the Oracle 'thin' JDBC driver.\n" + + "This problem prevents %s from being able to dynamically generate " + + "JDBC URLs that specify 'dedicated server connections' or spread " + + "mapper sessions across multiple Oracle instances.\n" + + "If the JDBC driver-type is 'OCI' (instead of 'thin'), then " + + "load-balancing should be appropriately managed automatically.", + jdbcConnectStr, OracleJdbcConnectorConstants.CONNECTOR_NAME, ex)); + return; + } + + if (generateRacBasedJdbcUrls) { + + // Retrieve the Oracle service name to use when connecting to the RAC... + String oracleServiceName = connectionConfig.racServiceName; + + // Generate JDBC URLs for each of the mappers... + if (!oracleServiceName.isEmpty()) { + if (!generateRacJdbcConnectionUrlsByServiceName(jdbcHost, jdbcPort, + oracleServiceName, connectionConfig, context)) { + throw new RuntimeException(String.format( + "Unable to connect to the Oracle database at %s " + + "via the service name \"%s\".", jdbcConnectStr, + oracleServiceName)); + } + } else { + generateJdbcConnectionUrlsByActiveInstance(activeInstances, jdbcPort, + connectionConfig, context); + } + } else { + generateJdbcConnectionUrlsByTnsnameSidOrService(jdbcHost, jdbcPort, + jdbcSid, jdbcService, jdbcTnsName, connectionConfig, context); + } + + } + + private boolean generateRacJdbcConnectionUrlsByServiceName(String hostName, + int port, String serviceName, ConnectionConfig connectionConfig, + MutableContext context) { + + boolean result = false; + String jdbcUrl = + OracleUtilities.generateOracleServiceNameJdbcUrl(hostName, port, + serviceName); + + if (testDynamicallyGeneratedOracleRacInstanceConnection(jdbcUrl, + connectionConfig.username, connectionConfig.password, + connectionConfig.jdbcProperties + , false // <- ShowInstanceSysTimestamp + , "" // <- instanceDescription + )) { + + LOG.info(String.format( + "%s will load-balance sessions across the Oracle RAC instances " + + "by connecting each mapper to the Oracle Service \"%s\".", + OracleJdbcConnectorConstants.CONNECTOR_NAME, serviceName)); + + // Now store these connection strings in such a way that each mapper knows + // which one to use... + for (int idxMapper = 0; idxMapper < numMappers; idxMapper++) { + storeJdbcUrlForMapper(idxMapper, jdbcUrl, context); + } + result = true; + } + return result; + } + + private void generateJdbcConnectionUrlsByTnsnameSidOrService(String hostName, + int port, String sid, String serviceName, String tnsName, + ConnectionConfig connectionConfig, MutableContext context) { + + String jdbcUrl = null; + if (tnsName != null && !tnsName.isEmpty()) { + jdbcUrl = OracleUtilities.generateOracleTnsNameJdbcUrl(tnsName); + } else if (sid != null && !sid.isEmpty()) { + jdbcUrl = OracleUtilities.generateOracleSidJdbcUrl(hostName, port, sid); + } else { + jdbcUrl = + OracleUtilities.generateOracleServiceNameJdbcUrl(hostName, port, + serviceName); + } + + // Now store these connection strings in such a way that each mapper knows + // which one to use... + for (int idxMapper = 0; idxMapper < numMappers; idxMapper++) { + storeJdbcUrlForMapper(idxMapper, jdbcUrl, context); + } + } + + private void + generateJdbcConnectionUrlsByActiveInstance( + List<OracleActiveInstance> activeInstances, int jdbcPort, + ConnectionConfig connectionConfig, MutableContext context) { + + // Generate JDBC URLs for each of the instances in the RAC... + ArrayList<OracleUtilities.JdbcOracleThinConnection> + jdbcOracleActiveThinConnections = + new ArrayList<OracleUtilities.JdbcOracleThinConnection>( + activeInstances.size()); + + for (OracleActiveInstance activeInstance : activeInstances) { + + OracleUtilities.JdbcOracleThinConnection + jdbcActiveInstanceThinConnection = + new OracleUtilities.JdbcOracleThinConnection( + activeInstance.getHostName(), + jdbcPort, activeInstance.getInstanceName(), "", ""); + + if (testDynamicallyGeneratedOracleRacInstanceConnection( + jdbcActiveInstanceThinConnection.toString(), + connectionConfig.username, + connectionConfig.password, connectionConfig.jdbcProperties, + true, activeInstance.getInstanceName())) { + jdbcOracleActiveThinConnections.add(jdbcActiveInstanceThinConnection); + } + } + + // If there are multiple JDBC URLs that work okay for the RAC, then we'll + // make use of them... + if (jdbcOracleActiveThinConnections.size() < OracleJdbcConnectorConstants. + MIN_NUM_RAC_ACTIVE_INSTANCES_FOR_DYNAMIC_JDBC_URLS) { + LOG.info(String + .format( + "%s will not attempt to load-balance sessions across instances " + + "of an Oracle RAC - as multiple JDBC URLs to the " + + "Oracle RAC could not be dynamically generated.", + OracleJdbcConnectorConstants.CONNECTOR_NAME)); + return; + } else { + StringBuilder msg = new StringBuilder(); + msg.append(String + .format( + "%s will load-balance sessions across the following instances of" + + "the Oracle RAC:\n", + OracleJdbcConnectorConstants.CONNECTOR_NAME)); + + for (OracleUtilities.JdbcOracleThinConnection thinConnection + : jdbcOracleActiveThinConnections) { + msg.append(String.format("\tInstance: %s \t URL: %s\n", + thinConnection.getSid(), thinConnection.toString())); + } + LOG.info(msg.toString()); + } + + // Now store these connection strings in such a way that each mapper knows + // which one to use... + int racInstanceIdx = 0; + OracleUtilities.JdbcOracleThinConnection thinUrl; + for (int idxMapper = 0; idxMapper < numMappers; idxMapper++) { + if (racInstanceIdx > jdbcOracleActiveThinConnections.size() - 1) { + racInstanceIdx = 0; + } + thinUrl = jdbcOracleActiveThinConnections.get(racInstanceIdx); + racInstanceIdx++; + storeJdbcUrlForMapper(idxMapper, thinUrl.toString(), context); + } + } + + private void storeJdbcUrlForMapper(int mapperIdx, String jdbcUrl, + MutableContext context) { + + // Now store these connection strings in such a way that each mapper knows + // which one to use... + String mapperJdbcUrlPropertyName = + OracleUtilities.getMapperJdbcUrlPropertyName(mapperIdx); + LOG.debug("Setting mapper url " + mapperJdbcUrlPropertyName + " = " + + jdbcUrl); + context.setString(mapperJdbcUrlPropertyName, jdbcUrl); + } + + private boolean testDynamicallyGeneratedOracleRacInstanceConnection( + String url, String userName, String password, + Map<String, String> jdbcProperties, + boolean showInstanceSysTimestamp, String instanceDescription) { + + boolean result = false; + + // Test the connection... + try { + Properties additionalProps = new Properties(); + if(jdbcProperties != null) { + additionalProps.putAll(jdbcProperties); + } + Connection testConnection = + OracleConnectionFactory.createOracleJdbcConnection( + OracleJdbcConnectorConstants.ORACLE_JDBC_DRIVER_CLASS, + url, userName, password, additionalProps); + + // Show the system time on each instance... + if (showInstanceSysTimestamp) { + LOG.info(String.format("\tDatabase time on %s is %s", + instanceDescription, OracleQueries + .getSysTimeStamp(testConnection))); + } + + testConnection.close(); + result = true; + } catch (SQLException ex) { + LOG.warn( + String + .format( + "The dynamically generated JDBC URL \"%s\" was unable to " + + "connect to an instance in the Oracle RAC.", + url), ex); + } + + return result; + } + + private void showUserTheOracleCommandToKillOraOop(MutableContext context) { + + String moduleName = + OracleJdbcConnectorConstants.ORACLE_SESSION_MODULE_NAME; + String actionName = context.getString( + OracleJdbcConnectorConstants.ORACLE_SESSION_ACTION_NAME); + + String msg = String.format( + "\nNote: This %s job can be killed via Oracle by executing the " + + "following statement:\n\tbegin\n" + + "\t\tfor row in (select sid,serial# from v$session where module='%s' " + + "and action='%s') loop\n" + + "\t\t\texecute immediate 'alter system kill session ''' || row.sid || " + + "',' || row.serial# || '''';\n" + + "\t\tend loop;\n" + "\tend;", + OracleJdbcConnectorConstants.CONNECTOR_NAME, moduleName, actionName); + LOG.info(msg); + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcConnector.java ---------------------------------------------------------------------- diff --git a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcConnector.java b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcConnector.java new file mode 100644 index 0000000..ae0b9dc --- /dev/null +++ b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcConnector.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.jdbc.oracle; + +import java.util.Locale; +import java.util.ResourceBundle; + +import org.apache.sqoop.common.Direction; +import org.apache.sqoop.common.VersionInfo; +import org.apache.sqoop.connector.jdbc.oracle.configuration.FromJobConfiguration; +import org.apache.sqoop.connector.jdbc.oracle.configuration.ToJobConfiguration; +import org.apache.sqoop.connector.jdbc.oracle.configuration.LinkConfiguration; +import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader; +import org.apache.sqoop.connector.spi.SqoopConnector; +import org.apache.sqoop.job.etl.From; +import org.apache.sqoop.job.etl.To; + +public class OracleJdbcConnector extends SqoopConnector { + + private static final To TO = new To( + OracleJdbcToInitializer.class, + OracleJdbcLoader.class, + OracleJdbcToDestroyer.class); + + private static final From FROM = new From( + OracleJdbcFromInitializer.class, + OracleJdbcPartitioner.class, + OracleJdbcPartition.class, + OracleJdbcExtractor.class, + OracleJdbcFromDestroyer.class); + + @Override + public String getVersion() { + return VersionInfo.getBuildVersion(); + } + + @Override + public ResourceBundle getBundle(Locale locale) { + return ResourceBundle.getBundle( + OracleJdbcConnectorConstants.RESOURCE_BUNDLE_NAME, locale); + } + + @SuppressWarnings("rawtypes") + @Override + public Class getLinkConfigurationClass() { + return LinkConfiguration.class; + } + + @SuppressWarnings("rawtypes") + @Override + public Class getJobConfigurationClass(Direction jobType) { + switch (jobType) { + case FROM: + return FromJobConfiguration.class; + case TO: + return ToJobConfiguration.class; + default: + return null; + } + } + + @Override + public From getFrom() { + return FROM; + } + + @Override + public To getTo() { + return TO; + } + + @Override + public ConnectorConfigurableUpgrader getConfigurableUpgrader(String oldConnectorVersion) { + return new OracleJdbcConnectorUpgrader(); + } + +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcConnectorConstants.java ---------------------------------------------------------------------- diff --git a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcConnectorConstants.java b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcConnectorConstants.java new file mode 100644 index 0000000..2215cf3 --- /dev/null +++ b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcConnectorConstants.java @@ -0,0 +1,493 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.jdbc.oracle; + +public final class OracleJdbcConnectorConstants { + + // Resource bundle name + public static final String RESOURCE_BUNDLE_NAME = + "oracle-jdbc-connector-config"; + + public static final String CONNECTOR_NAME = "Sqoop Oracle Connector"; + + // The string we want to pass to dbms_application_info.set_module() via the + // "module_name" parameter... + public static final String ORACLE_SESSION_MODULE_NAME = CONNECTOR_NAME; + + public static final String ORACLE_SESSION_ACTION_NAME = + "oracle.session.module.action"; + + //How many rows to pre-fetch when executing Oracle queries... + public static final int ORACLE_ROW_FETCH_SIZE_DEFAULT = 5000; + + // The name of the Oracle JDBC class... + public static final String ORACLE_JDBC_DRIVER_CLASS = + "oracle.jdbc.OracleDriver"; + + public static final String ORACLE_SESSION_INITIALIZATION_STATEMENTS_DEFAULT = + "alter session disable parallel query;" + + "alter session set \"_serial_direct_read\"=true;" + + "alter session set tracefile_identifier=oraoop;" + + "--alter session set events '10046 trace name context forever, level 8';"; + + + ///////////////////////////////////////////////////////////////////// + +// // Whether to log Oracle session statistics using Guy Harrison's jar file... +// public static final String ORAOOP_REPORT_SESSION_STATISTICS = +// "oraoop.report.session.statistics"; +// +// // Disables dynamic JDBC URL generation for each mapper... +// public static final String ORAOOP_JDBC_URL_VERBATIM = +// "oraoop.jdbc.url.verbatim"; +// +// // The name of the Oracle RAC service each mapper should connect to, via their +// // dynamically generated JDBC URL... +// public static final String ORAOOP_ORACLE_RAC_SERVICE_NAME = +// "oraoop.oracle.rac.service.name"; +// +// // The log4j log-level for OraOop... +// public static final String ORAOOP_LOGGING_LEVEL = "oraoop.logging.level"; +// +// // The file names for the configuration properties of OraOop... +// public static final String ORAOOP_SITE_TEMPLATE_FILENAME = +// "oraoop-site-template.xml"; +// public static final String ORAOOP_SITE_FILENAME = "oraoop-site.xml"; +// +// // A flag that indicates that the OraOop job has been cancelled. +// // E.g. An Oracle DBA killed our Oracle session. +// // public static final String ORAOOP_JOB_CANCELLED = "oraoop.job.cancelled"; +// + // The SYSDATE from the Oracle database when this OraOop job was started. + // This is used to generate unique names for partitions and temporary tables + // that we create during the job... + public static final String SQOOP_ORACLE_JOB_SYSDATE = + "sqoop.oracle.job.sysdate"; +// +// // The properties are used internally by OraOop to indicate the schema and +// // name of +// // the table being imported/exported... +// public static final String ORAOOP_TABLE_OWNER = "oraoop.table.owner"; +// public static final String ORAOOP_TABLE_NAME = "oraoop.table.name"; +// +// // Constants used to indicate the desired location of the WHERE clause within +// // the SQL generated by the record-reader. +// // E.g. A WHERE clause like "rownum <= 10" would want to be located so that +// // it had an impact on the total number of rows returned by the split; +// // as opposed to impacting the number of rows returned for each of the +// // unioned data-chunks within each split. +// public static final String ORAOOP_TABLE_IMPORT_WHERE_CLAUSE_LOCATION = +// "oraoop.table.import.where.clause.location"; +// +// +// // Reliably stores the number mappers requested for the sqoop map-reduce +// // job... +// public static final String ORAOOP_DESIRED_NUMBER_OF_MAPPERS = +// "oraoop.desired.num.mappers"; +// +// // The minimum number of mappers required for OraOop to accept the import +// // job... +// public static final String ORAOOP_MIN_IMPORT_MAPPERS = +// "oraoop.min.import.mappers"; +// public static final int MIN_NUM_IMPORT_MAPPERS_ACCEPTED_BY_ORAOOP = 2; +// +// // The minimum number of mappers required for OraOop to accept the export +// // job... +// public static final String ORAOOP_MIN_EXPORT_MAPPERS = +// "oraoop.min.export.mappers"; +// public static final int MIN_NUM_EXPORT_MAPPERS_ACCEPTED_BY_ORAOOP = 2; +// +// // The query used to fetch oracle data chunks... +// public static final String ORAOOP_ORACLE_DATA_CHUNKS_QUERY = +// "oraoop.oracle.data.chunks.query"; +// + // The minimum number of active instances in an Oracle RAC required for OraOop + // to use dynamically generated JDBC URLs... + public static final int MIN_NUM_RAC_ACTIVE_INSTANCES_FOR_DYNAMIC_JDBC_URLS = + 2; +// +// +// +// // OraOop does not require a "--split-by" column to be defined... +// public static final String TABLE_SPLIT_COLUMN_NOT_REQUIRED = "not-required"; +// + // The name of the data_chunk_id column the OraOop appends to each (import) + // query... + public static final String COLUMN_NAME_DATA_CHUNK_ID = "data_chunk_id"; +// +// // The hint that will be used on the SELECT statement for import jobs +// public static final String IMPORT_QUERY_HINT = "oraoop.import.hint"; +// +// // Pseudo-columns added to an partitioned export table (created by OraOop from +// // a template table) +// // to store the partition value and subpartition value. The partition value is +// // the sysdate when +// // the job was performed. The subpartition value is the mapper index... + public static final String COLUMN_NAME_EXPORT_PARTITION = + "SQOOP_EXPORT_SYSDATE"; + public static final String COLUMN_NAME_EXPORT_SUBPARTITION = + "SQOOP_MAPPER_ID"; + public static final String COLUMN_NAME_EXPORT_MAPPER_ROW = + "SQOOP_MAPPER_ROW"; + + public static final String ORAOOP_EXPORT_PARTITION_DATE_VALUE = + "oraoop.export.partition.date.value"; + public static final String ORAOOP_EXPORT_PARTITION_DATE_FORMAT = + "yyyy-mm-dd hh24:mi:ss"; +// +// +// +// // Boolean whether to do a consistent read based off an SCN +// public static final String ORAOOP_IMPORT_CONSISTENT_READ = +// "oraoop.import.consistent.read"; +// + // The SCN number to use for the consistent read + public static final String ORACLE_IMPORT_CONSISTENT_READ_SCN = + "oracle.import.consistent.read.scn"; +// +// // The method that will be used to create data chunks - ROWID ranges or +// // partitions +// public static final String ORAOOP_ORACLE_DATA_CHUNK_METHOD = +// "oraoop.chunk.method"; + +// // List of partitions to be imported, comma seperated list +// public static final String ORAOOP_IMPORT_PARTITION_LIST = +// "oraoop.import.partitions"; +// +// public static final OraOopOracleDataChunkMethod +// ORAOOP_ORACLE_DATA_CHUNK_METHOD_DEFAULT = +// OraOopOracleDataChunkMethod.ROWID; +// +// // How to allocate data-chunks into splits... +// public static final String ORAOOP_ORACLE_BLOCK_TO_SPLIT_ALLOCATION_METHOD = +// "oraoop.block.allocation"; +// + +// +// // Whether to omit LOB and LONG columns during an import... +// public static final String ORAOOP_IMPORT_OMIT_LOBS_AND_LONG = +// "oraoop.import.omit.lobs.and.long"; +// +// // Identifies an existing Oracle table used to create a new table as the +// // destination of a Sqoop export. +// // Hence, use of this property implies that the "-table" does not exist in +// // Oracle and OraOop should create it. +// public static final String ORAOOP_EXPORT_CREATE_TABLE_TEMPLATE = +// "oraoop.template.table"; +// +// // If the table already exists that we want to create, should we drop it?... +// public static final String ORAOOP_EXPORT_CREATE_TABLE_DROP = +// "oraoop.drop.table"; +// +// // If ORAOOP_EXPORT_CREATE_TABLE_TEMPLATE has been specified, then this flag +// // indicates whether the created Oracle +// // tables should have NOLOGGING... +// public static final String ORAOOP_EXPORT_CREATE_TABLE_NO_LOGGING = +// "oraoop.no.logging"; +// +// // If ORAOOP_EXPORT_CREATE_TABLE_TEMPLATE has been specified, then this flag +// // indicates whether the created Oracle +// // tables should be partitioned by job and mapper... +// public static final String ORAOOP_EXPORT_CREATE_TABLE_PARTITIONED = +// "oraoop.partitioned"; +// + // Indicates (internally) the the export table we're dealling with has been + // paritioned by Sqoop... + public static final String EXPORT_TABLE_HAS_SQOOP_PARTITIONS = + "sqoop.export.table.has.sqoop.partitions"; + + // When using the Oracle hint... /* +APPEND_VALUES */ ...a commit must be + // performed after each batch insert. + // Therefore, the batches need to be quite large to avoid a performance + // penality (for the 'extra' commits). + // This is the minimum batch size to use under these conditions... +// public static final String ORAOOP_MIN_APPEND_VALUES_BATCH_SIZE = +// "oraoop.min.append.values.batch.size"; + public static final int MIN_APPEND_VALUES_BATCH_SIZE_DEFAULT = 5000; +// +// // The version of the Oracle database we're connected to... +// public static final String ORAOOP_ORACLE_DATABASE_VERSION_MAJOR = +// "oraoop.oracle.database.version.major"; +// public static final String ORAOOP_ORACLE_DATABASE_VERSION_MINOR = +// "oraoop.oracle.database.version.minor"; +// +// // When OraOop creates a table for a Sqoop export (from a template table) and +// // the table contains partitions, +// // this is the prefix of those partition names. (This also allows us to later +// // identify partitions that OraOop +// // created.) + public static final String EXPORT_TABLE_PARTITION_NAME_PREFIX = "SQOOP_"; + + // When OraOop creates temporary tables for each mapper during a Sqoop export + // this is the prefix of table names... + public static final String EXPORT_MAPPER_TABLE_NAME_PREFIX = "SQOOP_"; + + // The format string used to turn a DATE into a string for use within the + // names of Oracle objects + // that we create. For example, temporary tables, table partitions, table + // subpartitions... + public static final String ORACLE_OBJECT_NAME_DATE_TO_STRING_FORMAT_STRING = + "yyyymmdd_hh24miss"; + +// // Indicates whether to perform a "merge" operation when performing a Sqoop +// // export. +// // If false, 'insert' statements will be used (i.e. no 'updates')... +// public static final String ORAOOP_EXPORT_MERGE = "oraoop.export.merge"; +// +// // This property allows the user to enable parallelization during exports... +// public static final String ORAOOP_EXPORT_PARALLEL = +// "oraoop.export.oracle.parallelization.enabled"; +// +// // Flag used to indicate that the Oracle table contains at least one column of +// // type BINARY_DOUBLE... +// public static final String TABLE_CONTAINS_BINARY_DOUBLE_COLUMN = +// "oraoop.table.contains.binary.double.column"; +// // Flag used to indicate that the Oracle table contains at least one column of +// // type BINARY_FLOAT... +// public static final String TABLE_CONTAINS_BINARY_FLOAT_COLUMN = +// "oraoop.table.contains.binary.float.column"; +// +// // The storage clause to append to the end of any CREATE TABLE statements we +// // execute for temporary Oracle tables... +// public static final String ORAOOP_TEMPORARY_TABLE_STORAGE_CLAUSE = +// "oraoop.temporary.table.storage.clause"; +// +// // The storage clause to append to the end of any CREATE TABLE statements we +// // execute for permanent (export) Oracle tables... +// public static final String ORAOOP_EXPORT_TABLE_STORAGE_CLAUSE = +// "oraoop.table.storage.clause"; +// +// // Additional columns to include with the --update-key column... +// public static final String ORAOOP_UPDATE_KEY_EXTRA_COLUMNS = +// "oraoop.update.key.extra.columns"; +// +// // Should OraOop map Timestamps as java.sql.Timestamp as Sqoop does, or as +// // String +// public static final String ORAOOP_MAP_TIMESTAMP_AS_STRING = +// "oraoop.timestamp.string"; +// public static final boolean ORAOOP_MAP_TIMESTAMP_AS_STRING_DEFAULT = true; +// +// // This flag allows the user to force use of the APPEND_VALUES Oracle hint +// // either ON, OFF or AUTO... +// public static final String ORAOOP_ORACLE_APPEND_VALUES_HINT_USAGE = +// "oraoop.oracle.append.values.hint.usage"; +// + /** + * Whether to use the append values hint for exports. + */ + public enum AppendValuesHintUsage { + AUTO, ON, OFF + } + + // http://download.oracle.com/docs/cd/E11882_01/server.112/e17118/ + // sql_elements001.htm#i45441 + public static final String SUPPORTED_IMPORT_ORACLE_DATA_TYPES_CLAUSE = + "(DATA_TYPE IN (" + + + // "'BFILE',"+ + "'BINARY_DOUBLE'," + + "'BINARY_FLOAT'," + + "'BLOB'," + + "'CHAR'," + + "'CLOB'," + + "'DATE'," + + "'FLOAT'," + + "'LONG'," + + + // "'LONG RAW',"+ + // "'MLSLABEL',"+ + "'NCHAR'," + + "'NCLOB'," + + "'NUMBER'," + + "'NVARCHAR2'," + + "'RAW'," + + "'ROWID'," + + + // "'UNDEFINED',"+ + "'URITYPE'," + + + // "'UROWID',"+ //<- SqlType = 1111 = "OTHER" Not supported as + // "AAAAACAADAAAAAEAAF" is being returned as "AAAAAAgADAAAA" + "'VARCHAR2'" + + // <- Columns declared as VARCHAR are listed as VARCHAR2 in + // dba_tabl_columns + // "'XMLTYPE',"+ + ")" + " OR DATA_TYPE LIKE 'INTERVAL YEAR(%) TO MONTH'" + + " OR DATA_TYPE LIKE 'INTERVAL DAY(%) TO SECOND(%)'" + + " OR DATA_TYPE LIKE 'TIMESTAMP(%)'" + + " OR DATA_TYPE LIKE 'TIMESTAMP(%) WITH TIME ZONE'" + + " OR DATA_TYPE LIKE 'TIMESTAMP(%) WITH LOCAL TIME ZONE'" + ")"; + + public static final String SUPPORTED_EXPORT_ORACLE_DATA_TYPES_CLAUSE = + "(DATA_TYPE IN (" + + + // "'BFILE',"+ + "'BINARY_DOUBLE'," + + "'BINARY_FLOAT'," + + + // "'BLOB',"+ //<- Jira: SQOOP-117 Sqoop cannot export LOB data + "'CHAR'," + + + // "'CLOB',"+ //<- Jira: SQOOP-117 Sqoop cannot export LOB data + "'DATE'," + + "'FLOAT'," + + + // "'LONG',"+ //<- "create table as select..." and + // "insert into table as select..." do not work when a long column + // exists. + // "'LONG RAW',"+ + // "'MLSLABEL',"+ + "'NCHAR'," + + + // "'NCLOB',"+ //<- Jira: SQOOP-117 Sqoop cannot export LOB data + "'NUMBER'," + + "'NVARCHAR2'," + + + // "'RAW',"+ + "'ROWID'," + + + // "'UNDEFINED',"+ + "'URITYPE'," + + + // "'UROWID',"+ //<- SqlType = 1111 = "OTHER" Not supported as + // "AAAAACAADAAAAAEAAF" is being returned as "AAAAAAgADAAAA" + "'VARCHAR2'" + + // <- Columns declared as VARCHAR are listed as VARCHAR2 in + // dba_tabl_columns + // "'XMLTYPE',"+ + ")" + " OR DATA_TYPE LIKE 'INTERVAL YEAR(%) TO MONTH'" + + " OR DATA_TYPE LIKE 'INTERVAL DAY(%) TO SECOND(%)'" + + " OR DATA_TYPE LIKE 'TIMESTAMP(%)'" + + " OR DATA_TYPE LIKE 'TIMESTAMP(%) WITH TIME ZONE'" + + " OR DATA_TYPE LIKE 'TIMESTAMP(%) WITH LOCAL TIME ZONE'" + ")"; + +// // Query to get current logged on user +// public static final String QUERY_GET_SESSION_USER = "SELECT USER FROM DUAL"; +// +// // public static final int[] SUPPORTED_ORACLE_DATA_TYPES = { +// // oracle.jdbc.OracleTypes.BIT // -7; +// // ,oracle.jdbc.OracleTypes.TINYINT // -6; +// // ,oracle.jdbc.OracleTypes.SMALLINT // 5; +// // ,oracle.jdbc.OracleTypes.INTEGER // 4; +// // ,oracle.jdbc.OracleTypes.BIGINT // -5; +// // ,oracle.jdbc.OracleTypes.FLOAT // 6; +// // ,oracle.jdbc.OracleTypes.REAL // 7; +// // ,oracle.jdbc.OracleTypes.DOUBLE // 8; +// // ,oracle.jdbc.OracleTypes.NUMERIC // 2; +// // ,oracle.jdbc.OracleTypes.DECIMAL // 3; +// // ,oracle.jdbc.OracleTypes.CHAR // 1; +// // ,oracle.jdbc.OracleTypes.VARCHAR // 12; +// // ,oracle.jdbc.OracleTypes.LONGVARCHAR // -1; +// // ,oracle.jdbc.OracleTypes.DATE // 91; +// // ,oracle.jdbc.OracleTypes.TIME // 92; +// // ,oracle.jdbc.OracleTypes.TIMESTAMP // 93; +// // // ,oracle.jdbc.OracleTypes.TIMESTAMPNS // -100; //<- Deprecated +// // ,oracle.jdbc.OracleTypes.TIMESTAMPTZ // -101; +// // ,oracle.jdbc.OracleTypes.TIMESTAMPLTZ // -102; +// // ,oracle.jdbc.OracleTypes.INTERVALYM // -103; +// // ,oracle.jdbc.OracleTypes.INTERVALDS // -104; +// // ,oracle.jdbc.OracleTypes.BINARY // -2; +// // /// ,oracle.jdbc.OracleTypes.VARBINARY // -3; +// // ,oracle.jdbc.OracleTypes.LONGVARBINARY // -4; +// // ,oracle.jdbc.OracleTypes.ROWID // -8; +// // ,oracle.jdbc.OracleTypes.CURSOR // -10; +// // ,oracle.jdbc.OracleTypes.BLOB // 2004; +// // ,oracle.jdbc.OracleTypes.CLOB // 2005; +// // // ,oracle.jdbc.OracleTypes.BFILE // -13; +// // // ,oracle.jdbc.OracleTypes.STRUCT // 2002; +// // // ,oracle.jdbc.OracleTypes.ARRAY // 2003; +// // ,oracle.jdbc.OracleTypes.REF // 2006; +// // ,oracle.jdbc.OracleTypes.NCHAR // -15; +// // ,oracle.jdbc.OracleTypes.NCLOB // 2011; +// // ,oracle.jdbc.OracleTypes.NVARCHAR // -9; +// // ,oracle.jdbc.OracleTypes.LONGNVARCHAR // -16; +// // // ,oracle.jdbc.OracleTypes.SQLXML // 2009; +// // // ,oracle.jdbc.OracleTypes.OPAQUE // 2007; +// // // ,oracle.jdbc.OracleTypes.JAVA_STRUCT // 2008; +// // // ,oracle.jdbc.OracleTypes.JAVA_OBJECT // 2000; +// // // ,oracle.jdbc.OracleTypes.PLSQL_INDEX_TABLE // -14; +// // ,oracle.jdbc.OracleTypes.BINARY_FLOAT // 100; +// // ,oracle.jdbc.OracleTypes.BINARY_DOUBLE // 101; +// // ,oracle.jdbc.OracleTypes.NULL // 0; +// // ,oracle.jdbc.OracleTypes.NUMBER // 2; +// // // ,oracle.jdbc.OracleTypes.RAW // -2; +// // // ,oracle.jdbc.OracleTypes.OTHER // 1111; +// // ,oracle.jdbc.OracleTypes.FIXED_CHAR // 999; +// // // ,oracle.jdbc.OracleTypes.DATALINK // 70; +// // ,oracle.jdbc.OracleTypes.BOOLEAN // 16; +// // }; +// +// /** +// * Constants for things belonging to sqoop... +// */ +// public static final class Sqoop { +// private Sqoop() { +// } +// +// /** +// * What type of Sqoop tool is being run. +// */ +// public enum Tool { +// UNKNOWN, IMPORT, EXPORT +// } +// +// public static final String IMPORT_TOOL_NAME = "import"; +// public static final String MAX_MAPREDUCE_ATTEMPTS = +// "mapred.map.max.attempts"; +// } +// +/** + * Constants for things belonging to Oracle... + */ + public static final class Oracle { + private Oracle() { + } + + public static final int ROWID_EXTENDED_ROWID_TYPE = 1; + public static final int ROWID_MAX_ROW_NUMBER_PER_BLOCK = 32767; + + // This is how you comment-out a line of SQL text in Oracle. + public static final String ORACLE_SQL_STATEMENT_COMMENT_TOKEN = "--"; + + public static final String OBJECT_TYPE_TABLE = "TABLE"; + + public static final String URITYPE = "URITYPE"; + + public static final int MAX_IDENTIFIER_LENGTH = 30; // <- Max length of an + // Oracle name + // (table-name, + // partition-name etc.) + + public static final String HINT_SYNTAX = "/*+ %s */ "; // Syntax for a hint + // in Oracle + } +// +// /** +// * Logging constants. +// */ +// public static class Logging { +// /** +// * Level of log to output. +// */ +// public enum Level { +// TRACE, DEBUG, INFO, WARN, ERROR, FATAL +// } +// } +// + +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcConnectorUpgrader.java ---------------------------------------------------------------------- diff --git a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcConnectorUpgrader.java b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcConnectorUpgrader.java new file mode 100644 index 0000000..30693af --- /dev/null +++ b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcConnectorUpgrader.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.jdbc.oracle; + +import org.apache.sqoop.configurable.ConfigurableUpgradeUtil; +import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader; +import org.apache.sqoop.model.MFromConfig; +import org.apache.sqoop.model.MLinkConfig; +import org.apache.sqoop.model.MToConfig; + +// NOTE: All config types have the similar upgrade path at this point +public class OracleJdbcConnectorUpgrader extends ConnectorConfigurableUpgrader { + + @Override + public void upgradeLinkConfig(MLinkConfig original, MLinkConfig upgradeTarget) { + ConfigurableUpgradeUtil.doUpgrade(original.getConfigs(), upgradeTarget.getConfigs()); + } + + @Override + public void upgradeFromJobConfig(MFromConfig original, MFromConfig upgradeTarget) { + ConfigurableUpgradeUtil.doUpgrade(original.getConfigs(), upgradeTarget.getConfigs()); + } + + @Override + public void upgradeToJobConfig(MToConfig original, MToConfig upgradeTarget) { + ConfigurableUpgradeUtil.doUpgrade(original.getConfigs(), upgradeTarget.getConfigs()); + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcExtractor.java ---------------------------------------------------------------------- diff --git a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcExtractor.java b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcExtractor.java new file mode 100644 index 0000000..df15fc2 --- /dev/null +++ b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcExtractor.java @@ -0,0 +1,361 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.jdbc.oracle; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; + +import org.apache.commons.lang.BooleanUtils; +import org.apache.log4j.Logger; +import org.apache.sqoop.common.ImmutableContext; +import org.apache.sqoop.connector.jdbc.oracle.configuration.FromJobConfig; +import org.apache.sqoop.connector.jdbc.oracle.configuration.FromJobConfiguration; +import org.apache.sqoop.connector.jdbc.oracle.configuration.LinkConfiguration; +import org.apache.sqoop.connector.jdbc.oracle.util.OracleConnectionFactory; +import org.apache.sqoop.connector.jdbc.oracle.util.OracleDataChunk; +import org.apache.sqoop.connector.jdbc.oracle.util.OracleQueries; +import org.apache.sqoop.connector.jdbc.oracle.util.OracleTable; +import org.apache.sqoop.connector.jdbc.oracle.util.OracleTableColumn; +import org.apache.sqoop.connector.jdbc.oracle.util.OracleTableColumns; +import org.apache.sqoop.connector.jdbc.oracle.util.OracleUtilities; +import org.apache.sqoop.job.etl.Extractor; +import org.apache.sqoop.job.etl.ExtractorContext; +import org.apache.sqoop.schema.type.Column; +import org.apache.sqoop.schema.type.ColumnType; +import org.joda.time.DateTime; +import org.joda.time.LocalDateTime; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; + +public class OracleJdbcExtractor extends + Extractor<LinkConfiguration, FromJobConfiguration, OracleJdbcPartition> { + + private static final Logger LOG = Logger.getLogger(OracleJdbcExtractor.class); + + private Connection connection; + private OracleTable table; + private int mapperId; // <- The index of this Hadoop mapper + private long rowsRead = 0; + + private OracleTableColumns tableColumns; + + private OracleJdbcPartition dbInputSplit; // <- The split this record-reader + // is working on. + private int numberOfBlocksInThisSplit; // <- The number of Oracle blocks in + // this Oracle data-chunk. + private int numberOfBlocksProcessedInThisSplit; // <- How many Oracle blocks + // we've processed with this + // record-reader. + private String currentDataChunkId; // <- The id of the current data-chunk + // being processed + private ResultSet results; // <- The ResultSet containing the data from the + // query returned by getSelectQuery() + private int columnIndexDataChunkIdZeroBased = -1; // <- The zero-based column + // index of the + // data_chunk_id column. + private boolean progressCalculationErrorLogged; // <- Whether we've logged a + // problem with the progress + // calculation during + // nextKeyValue(). + private Object oraOopOraStats; // <- A reference to the Oracle statistics + // object that is being tracked for this Oracle + // session. + private boolean profilingEnabled; // <- Whether to collect profiling metrics + private long timeSpentInNextKeyValueInNanoSeconds; // <- Total time spent in + // super.nextKeyValue() + + private static final DateTimeFormatter TIMESTAMP_TIMEZONE = + DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS z"); + + @Override + public void extract(ExtractorContext context, + LinkConfiguration linkConfiguration, + FromJobConfiguration jobConfiguration, OracleJdbcPartition partition) { + //TODO: Mapper ID + mapperId = 1; + dbInputSplit = partition; + + // Retrieve the JDBC URL that should be used by this mapper. + String mapperJdbcUrlPropertyName = + OracleUtilities.getMapperJdbcUrlPropertyName(mapperId); + String mapperJdbcUrl = context.getString(mapperJdbcUrlPropertyName, null); + + LOG.debug(String.format("Mapper %d has a JDBC URL of: %s", mapperId, + mapperJdbcUrl == null ? "<null>" : mapperJdbcUrl)); + + try { + connection = OracleConnectionFactory.createOracleJdbcConnection( + OracleJdbcConnectorConstants.ORACLE_JDBC_DRIVER_CLASS, + mapperJdbcUrl, + linkConfiguration.connectionConfig.username, + linkConfiguration.connectionConfig.password); + } catch (SQLException ex) { + throw new RuntimeException(String.format( + "Unable to connect to the Oracle database at %s\nError:%s", + linkConfiguration.connectionConfig.connectionString, ex + .getMessage()), ex); + } + + table = OracleUtilities.decodeOracleTableName( + linkConfiguration.connectionConfig.username, + jobConfiguration.fromJobConfig.tableName); + + try { + String thisOracleInstanceName = + OracleQueries.getCurrentOracleInstanceName(connection); + + LOG.info(String.format( + "This record reader is connected to Oracle via the JDBC URL: \n" + + "\t\"%s\"\n" + "\tto the Oracle instance: \"%s\"", mapperJdbcUrl, + thisOracleInstanceName)); + + OracleConnectionFactory.initializeOracleConnection( + connection, linkConfiguration.connectionConfig); + } catch(SQLException ex) { + throw new RuntimeException(String.format( + "Unable to initialize connection to the Oracle database at %s\n" + + "Error:%s", + linkConfiguration.connectionConfig.connectionString, ex + .getMessage()), ex); + } + + try { + tableColumns = + OracleQueries.getFromTableColumns(connection, table, OracleUtilities. + omitLobAndLongColumnsDuringImport(jobConfiguration.fromJobConfig), + true // <- onlyOraOopSupportedTypes + ); + } catch (SQLException ex) { + LOG.error(String.format( + "Unable to obtain the data-types of the columns in table %s.\n" + + "Error:\n%s", table.toString(), ex.getMessage())); + throw new RuntimeException(ex); + } + + this.numberOfBlocksInThisSplit = + this.dbInputSplit.getTotalNumberOfBlocksInThisSplit(); + this.numberOfBlocksProcessedInThisSplit = 0; + + extractData(context, jobConfiguration.fromJobConfig); + + try { + connection.close(); + } catch(SQLException ex) { + throw new RuntimeException(String.format( + "Unable to close connection to the Oracle database at %s\nError:%s", + linkConfiguration.connectionConfig.connectionString, ex + .getMessage()), ex); + } + } + + private Object getObjectAtName(ResultSet resultSet, + OracleTableColumn column, Column sqoopColumn) throws SQLException { + Object result = null; + if(sqoopColumn.getType() == ColumnType.TEXT) { + result = resultSet.getString(column.getName()); + } else if (column.getOracleType() == OracleQueries + .getOracleType("TIMESTAMP")) { + Timestamp timestamp = resultSet.getTimestamp(column.getName()); + if(timestamp!=null) { + result = LocalDateTime.fromDateFields(timestamp); + } + } else if (column.getOracleType() == OracleQueries + .getOracleType("TIMESTAMPTZ") + || column.getOracleType() == OracleQueries + .getOracleType("TIMESTAMPLTZ")) { + Timestamp timestamp = resultSet.getTimestamp(column.getName()); + if(timestamp!=null) { + //TODO: BC dates + String dateTimeStr = resultSet.getString(column.getName()); + result = DateTime.parse(dateTimeStr, TIMESTAMP_TIMEZONE); + } + } else { + result = resultSet.getObject(column.getName()); + } + return result; + } + + private void extractData(ExtractorContext context, FromJobConfig jobConfig) { + String sql = getSelectQuery(jobConfig, context.getContext()); + Column[] columns = context.getSchema().getColumnsArray(); + int columnCount = columns.length; + try { + PreparedStatement statement = connection.prepareStatement(sql); + ResultSet resultSet = statement.executeQuery(); + + while(resultSet.next()) { + Object[] array = new Object[columnCount]; + for(int i = 0; i < columnCount; i++) { + OracleTableColumn tableColumn = + tableColumns.findColumnByName(columns[i].getName()); + array[i] = getObjectAtName(resultSet, tableColumn, columns[i]); + } + context.getDataWriter().writeArrayRecord(array); + rowsRead++; + } + + resultSet.close(); + statement.close(); + } catch (SQLException ex) { + LOG.error(String.format("Error in %s while executing the SQL query:\n" + + "%s\n\n" + "%s", OracleUtilities.getCurrentMethodName(), sql, ex + .getMessage())); + throw new RuntimeException(ex); + } + } + + @Override + public long getRowsRead() { + return rowsRead; + } + + private String getSelectQuery(FromJobConfig jobConfig, + ImmutableContext context) { + + boolean consistentRead = BooleanUtils.isTrue(jobConfig.consistentRead); + long consistentReadScn = context.getLong( + OracleJdbcConnectorConstants.ORACLE_IMPORT_CONSISTENT_READ_SCN, 0L); + if (consistentRead && consistentReadScn == 0L) { + throw new RuntimeException("Could not get SCN for consistent read."); + } + + StringBuilder query = new StringBuilder(); + + if (this.dbInputSplit.getDataChunks() == null) { + String errMsg = + String.format("The %s does not contain any data-chunks, within %s.", + this.dbInputSplit.getClass().getName(), OracleUtilities + .getCurrentMethodName()); + throw new RuntimeException(errMsg); + } + + OracleUtilities.OracleTableImportWhereClauseLocation whereClauseLocation = + OracleUtilities.getTableImportWhereClauseLocation(jobConfig, + OracleUtilities.OracleTableImportWhereClauseLocation.SUBSPLIT); + + int numberOfDataChunks = this.dbInputSplit.getNumberOfDataChunks(); + for (int idx = 0; idx < numberOfDataChunks; idx++) { + + OracleDataChunk dataChunk = + this.dbInputSplit.getDataChunks().get(idx); + + if (idx > 0) { + query.append("UNION ALL \n"); + } + + query.append(getColumnNamesClause(tableColumns, + dataChunk.getId(), jobConfig)) // <- SELECT clause + .append("\n"); + + query.append(" FROM ").append(table.toString()).append(" "); + + if (consistentRead) { + query.append("AS OF SCN ").append(consistentReadScn).append(" "); + } + + query.append(getPartitionClauseForDataChunk(this.dbInputSplit, idx)) + .append(" t").append("\n"); + + query.append(" WHERE (").append( + getWhereClauseForDataChunk(this.dbInputSplit, idx)).append(")\n"); + + // If the user wants the WHERE clause applied to each data-chunk... + if (whereClauseLocation == OracleUtilities. + OracleTableImportWhereClauseLocation.SUBSPLIT) { + String conditions = jobConfig.conditions; + if (conditions != null && conditions.length() > 0) { + query.append(" AND (").append(conditions).append(")\n"); + } + } + + } + + // If the user wants the WHERE clause applied to the whole split... + if (whereClauseLocation == OracleUtilities. + OracleTableImportWhereClauseLocation.SPLIT) { + String conditions = jobConfig.conditions; + if (conditions != null && conditions.length() > 0) { + + // Insert a "select everything" line at the start of the SQL query... + query.insert(0, getColumnNamesClause(tableColumns, null, jobConfig) + + " FROM (\n"); + + // ...and then apply the WHERE clause to all the UNIONed sub-queries... + query.append(")\n").append("WHERE\n").append(conditions).append("\n"); + } + } + + LOG.info("SELECT QUERY = \n" + query.toString()); + + return query.toString(); + } + + private String getColumnNamesClause(OracleTableColumns tableColumns, + String dataChunkId, FromJobConfig jobConfig) { + + StringBuilder result = new StringBuilder(); + + result.append("SELECT "); + result.append(OracleUtilities.getImportHint(jobConfig)); + + int firstFieldIndex = 0; + int lastFieldIndex = tableColumns.size(); + for (int i = firstFieldIndex; i < lastFieldIndex; i++) { + if (i > firstFieldIndex) { + result.append(","); + } + + OracleTableColumn oracleTableColumn = tableColumns.get(i); + String fieldName = oracleTableColumn.getName(); + if (oracleTableColumn != null) { + if (oracleTableColumn.getDataType().equals( + OracleJdbcConnectorConstants.Oracle.URITYPE)) { + fieldName = String.format("uritype.geturl(%s) %s", fieldName, + fieldName); + } + } + + result.append(fieldName); + } + // We need to insert the value of that data_chunk_id now... + if (dataChunkId != null && !dataChunkId.isEmpty()) { + String fieldName = + String.format(",'%s' %s", dataChunkId, + OracleJdbcConnectorConstants.COLUMN_NAME_DATA_CHUNK_ID); + result.append(fieldName); + } + return result.toString(); + } + + private String getPartitionClauseForDataChunk(OracleJdbcPartition split, + int dataChunkIndex) { + OracleDataChunk dataChunk = split.getDataChunks().get(dataChunkIndex); + return dataChunk.getPartitionClause(); + } + + private String getWhereClauseForDataChunk(OracleJdbcPartition split, + int dataChunkIndex) { + + OracleDataChunk dataChunk = split.getDataChunks().get(dataChunkIndex); + return dataChunk.getWhereClause(); + } + +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcFromDestroyer.java ---------------------------------------------------------------------- diff --git a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcFromDestroyer.java b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcFromDestroyer.java new file mode 100644 index 0000000..bd6fd0a --- /dev/null +++ b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcFromDestroyer.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.jdbc.oracle; + +import org.apache.sqoop.connector.jdbc.oracle.configuration.FromJobConfiguration; +import org.apache.sqoop.connector.jdbc.oracle.configuration.LinkConfiguration; +import org.apache.sqoop.job.etl.Destroyer; +import org.apache.sqoop.job.etl.DestroyerContext; + +public class OracleJdbcFromDestroyer extends + Destroyer<LinkConfiguration, FromJobConfiguration> { + + @Override + public void destroy(DestroyerContext context, + LinkConfiguration linkConfiguration, + FromJobConfiguration jobConfiguration) { + // TODO Auto-generated method stub + + } + +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcFromInitializer.java ---------------------------------------------------------------------- diff --git a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcFromInitializer.java b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcFromInitializer.java new file mode 100644 index 0000000..62a0e84 --- /dev/null +++ b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcFromInitializer.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.jdbc.oracle; + +import java.sql.SQLException; + +import org.apache.commons.lang.BooleanUtils; +import org.apache.log4j.Logger; +import org.apache.sqoop.connector.jdbc.oracle.configuration.FromJobConfiguration; +import org.apache.sqoop.connector.jdbc.oracle.configuration.LinkConfiguration; +import org.apache.sqoop.connector.jdbc.oracle.util.OracleQueries; +import org.apache.sqoop.connector.jdbc.oracle.util.OracleUtilities; +import org.apache.sqoop.job.etl.InitializerContext; + +public class OracleJdbcFromInitializer extends + OracleJdbcCommonInitializer<FromJobConfiguration> { + + private static final Logger LOG = + Logger.getLogger(OracleJdbcFromInitializer.class); + + @Override + public void connect(InitializerContext context, + LinkConfiguration linkConfiguration, + FromJobConfiguration jobConfiguration) throws SQLException { + super.connect(context, linkConfiguration, jobConfiguration); + table = OracleUtilities.decodeOracleTableName( + linkConfiguration.connectionConfig.username, + jobConfiguration.fromJobConfig.tableName); + } + + @Override + public void initialize(InitializerContext context, + LinkConfiguration linkConfiguration, + FromJobConfiguration jobConfiguration) { + super.initialize(context, linkConfiguration, jobConfiguration); + LOG.debug("Running Oracle JDBC connector FROM initializer"); + + try { + if(OracleQueries.isTableAnIndexOrganizedTable(connection, table)) { + if(OracleUtilities.getOraOopOracleDataChunkMethod( + jobConfiguration.fromJobConfig) != + OracleUtilities.OracleDataChunkMethod.PARTITION) { + throw new RuntimeException(String.format("Cannot process this Sqoop" + + " connection, as the Oracle table %s is an" + + " index-organized table. If the table is" + + " partitioned, set the data chunk method to " + + OracleUtilities.OracleDataChunkMethod.PARTITION + + ".", + table.toString())); + } + } + } catch (SQLException e) { + throw new RuntimeException(String.format( + "Unable to determine whether the Oracle table %s is an" + + "index-organized table.", table.toString()), e); + } + + if(BooleanUtils.isTrue(jobConfiguration.fromJobConfig.consistentRead)) { + Long scn = jobConfiguration.fromJobConfig.consistentReadScn; + if(scn==null || scn.equals(Long.valueOf(0L))) { + try { + scn = OracleQueries.getCurrentScn(connection); + } catch(SQLException e) { + throw new RuntimeException("Unable to determine SCN of database.", + e); + } + } + context.getContext().setLong( + OracleJdbcConnectorConstants.ORACLE_IMPORT_CONSISTENT_READ_SCN, + scn); + LOG.info("Performing a consistent read using SCN: " + scn); + } + } + +}
