http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcLoader.java ---------------------------------------------------------------------- diff --git a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcLoader.java b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcLoader.java new file mode 100644 index 0000000..b741dc8 --- /dev/null +++ b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcLoader.java @@ -0,0 +1,615 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.jdbc.oracle; + +import java.math.BigDecimal; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.List; + +import org.apache.log4j.Logger; +import org.apache.sqoop.connector.jdbc.oracle.configuration.LinkConfiguration; +import org.apache.sqoop.connector.jdbc.oracle.configuration.ToJobConfig; +import org.apache.sqoop.connector.jdbc.oracle.configuration.ToJobConfiguration; +import org.apache.sqoop.connector.jdbc.oracle.util.OracleConnectionFactory; +import org.apache.sqoop.connector.jdbc.oracle.util.OracleQueries; +import org.apache.sqoop.connector.jdbc.oracle.util.OracleTable; +import org.apache.sqoop.connector.jdbc.oracle.util.OracleTableColumn; +import org.apache.sqoop.connector.jdbc.oracle.util.OracleTableColumns; +import org.apache.sqoop.connector.jdbc.oracle.util.OracleUtilities; +import org.apache.sqoop.connector.jdbc.oracle.util.OracleUtilities.InsertMode; +import org.apache.sqoop.connector.jdbc.oracle.util.OracleUtilities.UpdateMode; +import org.apache.sqoop.connector.jdbc.oracle.util.OracleVersion; +import org.apache.sqoop.job.etl.Loader; +import org.apache.sqoop.job.etl.LoaderContext; +import org.apache.sqoop.schema.type.Column; +import org.joda.time.LocalDateTime; + +public class OracleJdbcLoader extends Loader<LinkConfiguration, ToJobConfiguration> { + + private static final Logger LOG = + Logger.getLogger(OracleJdbcToDestroyer.class); + + private long rowsWritten = 0; + private LoaderContext context; + private Connection connection; + private OracleVersion oracleVersion; + private OracleTable table; // <- If exporting into a partitioned + // table, this table will be unique for + // this mapper + private OracleTableColumns tableColumns; // <- The columns in the + // table we're inserting rows + // into + private int mapperId; // <- The index of this Hadoop mapper + private boolean tableHasMapperRowNumberColumn; // <- Whether the export + // table contain the column + // SQOOP_MAPPER_ROW + private long mapperRowNumber; // <- The 1-based row number being processed + // by this mapper. It's inserted into the + // "SQOOP_MAPPER_ROW" column + private boolean useAppendValuesOracleHint = false; // <- Whether to use the + // " /*+APPEND_VALUES*/ " hint + // within the Oracle SQL + // statement we generate + private long numberOfRowsSkipped; // <- The number of rows encountered + // during configurePreparedStatement() + // that had a NULL value for (one of) the + // update columns. This row was therefore + // skipped. + private String[] updateColumnNames; + private int rowsPerBatch; + private int rowsPerCommit; + + + private void setupInsert(LinkConfiguration linkConfiguration, + ToJobConfiguration jobConfiguration) throws SQLException { + // Is each mapper inserting rows into a unique table?... + InsertMode insertMode = OracleUtilities.getExportInsertMode( + jobConfiguration.toJobConfig, context.getContext()); + + if(insertMode==InsertMode.ExchangePartition) { + Object sysDateTime = + OracleUtilities.recallOracleDateTime(context.getContext()); + table = OracleUtilities.generateExportTableMapperTableName( + mapperId, sysDateTime, null); + + } else { + table = OracleUtilities.decodeOracleTableName( + linkConfiguration.connectionConfig.username, + jobConfiguration.toJobConfig.tableName); + } + tableColumns = OracleQueries.getToTableColumns( + connection, table, true, false); + tableHasMapperRowNumberColumn = + tableColumns.findColumnByName( + OracleJdbcConnectorConstants.COLUMN_NAME_EXPORT_MAPPER_ROW) != null; + + // Should we use the APPEND_VALUES Oracle hint?... + useAppendValuesOracleHint = false; + if (insertMode == InsertMode.ExchangePartition) { + // NB: "Direct inserts" cannot utilize APPEND_VALUES, otherwise Oracle + // will serialize + // the N mappers, causing a lot of lock contention. + useAppendValuesOracleHint = canUseOracleAppendValuesHint(); + } + } + + private void setupUpdate(LinkConfiguration linkConfiguration, + ToJobConfiguration jobConfiguration) throws SQLException { + UpdateMode updateMode = OracleUtilities.getExportUpdateMode( + jobConfiguration.toJobConfig); + + Object sysDateTime = + OracleUtilities.recallOracleDateTime(context.getContext()); + table = OracleUtilities.generateExportTableMapperTableName( + mapperId, sysDateTime, null); + + updateColumnNames = OracleUtilities. + getExportUpdateKeyColumnNames(jobConfiguration.toJobConfig); + + tableColumns = OracleQueries.getToTableColumns( + connection, table, true, false); + + if (updateMode == UpdateMode.Merge || updateMode == UpdateMode.Update) { + // Should we use the APPEND_VALUES Oracle hint?... + useAppendValuesOracleHint = canUseOracleAppendValuesHint(); + } + + } + + @Override + public void load(LoaderContext context, LinkConfiguration linkConfiguration, + ToJobConfiguration jobConfiguration) throws Exception { + LOG.debug("Running Oracle JDBC connector loader"); + this.context = context; + + //TODO: Mapper ID + mapperId = 1; + //TODO: Hardcoded values + rowsPerBatch = 5000; + rowsPerCommit = 5000; + + // Retrieve the JDBC URL that should be used by this mapper. + // We achieve this by modifying the JDBC URL property in the + // configuration, prior to the OraOopDBRecordWriter's (ancestral) + // constructor using the configuration to establish a connection + // to the database - via DBConfiguration.getConnection()... + String mapperJdbcUrlPropertyName = + OracleUtilities.getMapperJdbcUrlPropertyName(mapperId); + + // Get this mapper's JDBC URL + String mapperJdbcUrl = context.getString(mapperJdbcUrlPropertyName, null); + + LOG.debug(String.format("Mapper %d has a JDBC URL of: %s", mapperId, + mapperJdbcUrl == null ? "<null>" : mapperJdbcUrl)); + + connection = OracleConnectionFactory.createOracleJdbcConnection( + OracleJdbcConnectorConstants.ORACLE_JDBC_DRIVER_CLASS, + mapperJdbcUrl, + linkConfiguration.connectionConfig.username, + linkConfiguration.connectionConfig.password); + String thisOracleInstanceName = + OracleQueries.getCurrentOracleInstanceName(connection); + LOG.info(String.format( + "This record writer is connected to Oracle via the JDBC URL: \n" + + "\t\"%s\"\n" + "\tto the Oracle instance: \"%s\"", connection + .toString(), thisOracleInstanceName)); + OracleConnectionFactory.initializeOracleConnection( + connection, linkConfiguration.connectionConfig); + connection.setAutoCommit(false); + oracleVersion = OracleQueries.getOracleVersion(connection); + + if (jobConfiguration.toJobConfig.updateKey == null || + jobConfiguration.toJobConfig.updateKey.isEmpty()) { + setupInsert(linkConfiguration, jobConfiguration); + } else { + setupUpdate(linkConfiguration, jobConfiguration); + } + + // Has the user forced the use of APPEND_VALUES either on or off?... + useAppendValuesOracleHint = + allowUserToOverrideUseOfTheOracleAppendValuesHint( + jobConfiguration.toJobConfig, + useAppendValuesOracleHint); + + insertData(); + connection.close(); + } + + @Override + public long getRowsWritten() { + return rowsWritten; + } + + private void insertData() throws Exception { + // If using APPEND_VALUES, check the batch size and commit frequency... + if (useAppendValuesOracleHint) { + if(rowsPerBatch < OracleJdbcConnectorConstants. + MIN_APPEND_VALUES_BATCH_SIZE_DEFAULT) { + LOG.info(String + .format( + "The number of rows per batch-insert has been changed from %d " + + "to %d. This is in response " + + "to the Oracle APPEND_VALUES hint being used.", + rowsPerBatch, OracleJdbcConnectorConstants. + MIN_APPEND_VALUES_BATCH_SIZE_DEFAULT)); + rowsPerBatch = OracleJdbcConnectorConstants. + MIN_APPEND_VALUES_BATCH_SIZE_DEFAULT; + } + // Need to commit after each batch when using APPEND_VALUES + if(rowsPerCommit!=rowsPerBatch) { + LOG.info(String + .format( + "The number of rows to insert per commit has been " + + "changed from %d to %d. This is in response " + + "to the Oracle APPEND_VALUES hint being used.", + rowsPerCommit, rowsPerBatch)); + rowsPerCommit = rowsPerBatch; + } + } + + mapperRowNumber = 1; + + String sql = getBatchInsertSqlStatement(useAppendValuesOracleHint + ? "/*+APPEND_VALUES*/" : ""); + PreparedStatement statement = connection.prepareStatement(sql); + + Column[] columns = context.getSchema().getColumnsArray(); + Object[] array; + boolean checkUpdateColumns = false; + List<Integer> updateColumnIndexes = null; + if(updateColumnNames!=null) { + checkUpdateColumns = true; + updateColumnIndexes = new ArrayList<Integer>(); + for (int idx = 0; idx < this.updateColumnNames.length; idx++) { + for (int i = 0; i < columns.length; i++) { + if(columns[i].getName().equals(updateColumnNames[idx])) { + updateColumnIndexes.add(i); + } + } + } + } + + while ((array = context.getDataReader().readArrayRecord()) != null) { + if(checkUpdateColumns) { + boolean updateKeyValueIsNull = false; + for (Integer i : updateColumnIndexes) { + Object updateKeyValue = array[i]; + if (updateKeyValue == null) { + this.numberOfRowsSkipped++; + updateKeyValueIsNull = true; + break; + } + } + + if (updateKeyValueIsNull) { + continue; + } + } + rowsWritten++; + configurePreparedStatementColumns(statement, columns, array); + if(rowsWritten % rowsPerBatch == 0) { + statement.executeBatch(); + } + if(rowsWritten % rowsPerCommit == 0) { + connection.commit(); + } + } + if(rowsWritten % rowsPerBatch != 0) { + statement.executeBatch(); + } + connection.commit(); + statement.close(); + + if (numberOfRowsSkipped > 0) { + LOG.warn(String.format( + "%d records were skipped due to a NULL value within one of the " + + "update-key column(s).\nHaving a NULL value prevents a record " + + "from being able to be matched to a row in the Oracle table.", + numberOfRowsSkipped)); + } + } + + private String getBatchInsertSqlStatement(String oracleHint) { + + // String[] columnNames = this.getColumnNames(); + StringBuilder sqlNames = new StringBuilder(); + StringBuilder sqlValues = new StringBuilder(); + + /* + * NOTE: "this.oracleTableColumns" may contain a different list of columns + * than "this.getColumnNames()". This is because: (1) + * "this.getColumnNames()" includes columns with data-types that are not + * supported by OraOop. (2) "this.oracleTableColumns" includes any + * pseudo-columns that we've added to the export table (and don't exist in + * the HDFS file being read). For example, if exporting to a partitioned + * table (that OraOop created), there are two pseudo-columns we added to + * the table to identify the export job and the mapper. + */ + + int colCount = 0; + for (int idx = 0; idx < this.tableColumns.size(); idx++) { + OracleTableColumn oracleTableColumn = this.tableColumns.get(idx); + String columnName = oracleTableColumn.getName(); + + // column names... + if (colCount > 0) { + sqlNames.append("\n,"); + } + sqlNames.append(columnName); + + // column values... + if (colCount > 0) { + sqlValues.append("\n,"); + } + + String pseudoColumnValue = + generateInsertValueForPseudoColumn(columnName); + + String bindVarName = null; + + if (pseudoColumnValue != null) { + bindVarName = pseudoColumnValue; + } else if (oracleTableColumn.getOracleType() == OracleQueries + .getOracleType("STRUCT")) { + if (oracleTableColumn.getDataType().equals( + OracleJdbcConnectorConstants.Oracle.URITYPE)) { + bindVarName = + String.format("urifactory.getUri(%s)", + columnNameToBindVariable(columnName)); + } + //TODO: Date as string? + /*} else if (getConf().getBoolean( + OraOopConstants.ORAOOP_MAP_TIMESTAMP_AS_STRING, + OraOopConstants.ORAOOP_MAP_TIMESTAMP_AS_STRING_DEFAULT)) { + if (oracleTableColumn.getOracleType() == OraOopOracleQueries + .getOracleType("DATE")) { + bindVarName = + String.format("to_date(%s, 'yyyy-mm-dd hh24:mi:ss')", + columnNameToBindVariable(columnName)); + } else if (oracleTableColumn.getOracleType() == OraOopOracleQueries + .getOracleType("TIMESTAMP")) { + bindVarName = + String.format("to_timestamp(%s, 'yyyy-mm-dd hh24:mi:ss.ff')", + columnNameToBindVariable(columnName)); + } else if (oracleTableColumn.getOracleType() == OraOopOracleQueries + .getOracleType("TIMESTAMPTZ")) { + bindVarName = + String.format( + "to_timestamp_tz(%s, 'yyyy-mm-dd hh24:mi:ss.ff TZR')", + columnNameToBindVariable(columnName)); + } else if (oracleTableColumn.getOracleType() == OraOopOracleQueries + .getOracleType("TIMESTAMPLTZ")) { + bindVarName = + String.format( + "to_timestamp_tz(%s, 'yyyy-mm-dd hh24:mi:ss.ff TZR')", + columnNameToBindVariable(columnName)); + }*/ + } + + if (bindVarName == null) { + bindVarName = columnNameToBindVariable(columnName); + } + + sqlValues.append(bindVarName); + + colCount++; + } + + String sql = + String.format("insert %s into %s\n" + "(%s)\n" + "values\n" + + "(%s)\n", oracleHint, this.table.toString(), sqlNames + .toString(), sqlValues.toString()); + + LOG.info("Batch-Mode insert statement:\n" + sql); + return sql; + } + + private String generateInsertValueForPseudoColumn(String columnName) { + + if (columnName.equalsIgnoreCase( + OracleJdbcConnectorConstants.COLUMN_NAME_EXPORT_PARTITION)) { + + String partitionValueStr = + context.getString( + OracleJdbcConnectorConstants.ORAOOP_EXPORT_PARTITION_DATE_VALUE); + if (partitionValueStr == null) { + throw new RuntimeException( + "Unable to recall the value of the partition date-time."); + } + + return String.format("to_date('%s', '%s')", partitionValueStr, + OracleJdbcConnectorConstants.ORAOOP_EXPORT_PARTITION_DATE_FORMAT); + } + + if (columnName.equalsIgnoreCase( + OracleJdbcConnectorConstants.COLUMN_NAME_EXPORT_SUBPARTITION)) { + return Integer.toString(this.mapperId); + } + + return null; + } + + private String columnNameToBindVariable(String columnName) { + return ":" + columnName; + } + + private void configurePreparedStatementColumns( + PreparedStatement statement, Column[] columns, Object[] array) + throws SQLException { + + String bindValueName; + + if (this.tableHasMapperRowNumberColumn) { + bindValueName = columnNameToBindVariable(OracleJdbcConnectorConstants. + COLUMN_NAME_EXPORT_MAPPER_ROW).replaceFirst(":", ""); + try { + OracleQueries.setLongAtName(statement, bindValueName, + this.mapperRowNumber); + } catch (Exception e) { + throw new RuntimeException(e); + } + this.mapperRowNumber++; + } + + for (int i = 0; i < array.length; i++) { + String colName = columns[i].getName(); + bindValueName = columnNameToBindVariable(colName).replaceFirst(":", ""); + OracleTableColumn oracleTableColumn = + tableColumns.findColumnByName(colName); + setBindValueAtName(statement, bindValueName, array[i], + oracleTableColumn); + } + statement.addBatch(); + } + + private void setBindValueAtName(PreparedStatement statement, + String bindValueName, Object bindValue, OracleTableColumn column) + throws SQLException { + if (column.getOracleType() + == OracleQueries.getOracleType("NUMBER")) { + OracleQueries.setBigDecimalAtName(statement, bindValueName, + (BigDecimal) bindValue); + } else if (column.getOracleType() == OracleQueries + .getOracleType("VARCHAR")) { + OracleQueries.setStringAtName(statement, bindValueName, + (String) bindValue); + } else if (column.getOracleType() == OracleQueries + .getOracleType("TIMESTAMP") + || column.getOracleType() == OracleQueries + .getOracleType("TIMESTAMPTZ") + || column.getOracleType() == OracleQueries + .getOracleType("TIMESTAMPLTZ")) { + Object objValue = bindValue; + if (objValue instanceof LocalDateTime) { + //TODO: Improve date handling + LocalDateTime value = (LocalDateTime) objValue; + Timestamp timestampValue = + new Timestamp(value.toDateTime().getMillis()); + OracleQueries.setTimestampAtName(statement, bindValueName, + timestampValue); + } else { + String value = (String) objValue; + + if (value == null || value.equalsIgnoreCase("null")) { + value = ""; + } + + OracleQueries.setStringAtName(statement, bindValueName, value); + } + } else if (column.getOracleType() == OracleQueries + .getOracleType("BINARY_DOUBLE")) { + Double value = (Double) bindValue; + if (value != null) { + OracleQueries.setBinaryDoubleAtName(statement, bindValueName, + value); + } else { + OracleQueries.setObjectAtName(statement, bindValueName, null); + } + } else if (column.getOracleType() == OracleQueries + .getOracleType("BINARY_FLOAT")) { + Float value = (Float) bindValue; + if (value != null) { + OracleQueries.setBinaryFloatAtName(statement, bindValueName, + value); + } else { + OracleQueries.setObjectAtName(statement, bindValueName, null); + } + } else if (column.getOracleType() == OracleQueries + .getOracleType("STRUCT")) { // <- E.g. URITYPE + if (column.getDataType().equals( + OracleJdbcConnectorConstants.Oracle.URITYPE)) { + String value = (String) bindValue; + OracleQueries.setStringAtName(statement, bindValueName, value); + } else { + String msg = + String.format( + "%s needs to be updated to cope with the data-type: %s " + + "where the Oracle data_type is \"%s\".", + OracleUtilities.getCurrentMethodName(), column.getDataType(), + column.getOracleType()); + LOG.error(msg); + throw new UnsupportedOperationException(msg); + } + } else { + // LOB data-types are currently not supported during + // a Sqoop Export. + // JIRA: SQOOP-117 + // OraOopConstants.SUPPORTED_EXPORT_ORACLE_DATA_TYPES_CLAUSE + // will already have excluded all LOB columns. + + // case oracle.jdbc.OracleTypes.CLOB: + // { + // oracle.sql.CLOB clob = new + // oracle.sql.CLOB(connection); + // Object value = fieldMap.get(colName); + // //clob.set + // statement.setCLOBAtName(bindValueName, clob); + // break; + // } + String msg = + String.format( + "%s may need to be updated to cope with the data-type: %s", + OracleUtilities.getCurrentMethodName(), column.getOracleType()); + LOG.debug(msg); + + OracleQueries + .setObjectAtName(statement, bindValueName, bindValue); + } + } + + private boolean canUseOracleAppendValuesHint() { + + // Should we use the APPEND_VALUES Oracle hint?... + // (Yes, if this is Oracle 11.2 or above)... + boolean result = oracleVersion.isGreaterThanOrEqualTo(11, 2, 0, 0); + + // If there is a BINARY_DOUBLE or BINARY_FLOAT column, then we'll avoid + // using + // the APPEND_VALUES hint. If there is a NULL in the HDFS file, then we'll + // encounter + // "ORA-12838: cannot read/modify an object after modifying it in parallel" + // due to the JDBC driver issuing the INSERT statement twice to the database + // without a COMMIT in between (as was observed via WireShark). + // We're not sure why this happens - we just know how to avoid it. + if (result) { + boolean binaryDoubleColumnExists = false; + boolean binaryFloatColumnExists = false; + for (int idx = 0; idx < this.tableColumns.size(); idx++) { + OracleTableColumn oracleTableColumn = this.tableColumns.get(idx); + if(oracleTableColumn.getOracleType()== + OracleQueries.getOracleType("BINARY_DOUBLE")) { + binaryDoubleColumnExists = true; + } + if(oracleTableColumn.getOracleType()== + OracleQueries.getOracleType("BINARY_FLOAT")) { + binaryFloatColumnExists = true; + } + } + + if (binaryDoubleColumnExists || binaryFloatColumnExists) { + result = false; + LOG.info("The APPEND_VALUES Oracle hint will not be used for the " + + "INSERT SQL statement, as the Oracle table " + + "contains either a BINARY_DOUBLE or BINARY_FLOAT column."); + } + } + + return result; + } + + protected boolean allowUserToOverrideUseOfTheOracleAppendValuesHint( + ToJobConfig jobConfig, boolean useAppendValuesOracleHint) { + + boolean result = useAppendValuesOracleHint; + + // Has the user forced the use of APPEND_VALUES either on or off?... + switch (OracleUtilities.getOracleAppendValuesHintUsage(jobConfig)) { + + case OFF: + result = false; + LOG.debug(String + .format( + "Use of the APPEND_VALUES Oracle hint has been forced OFF. " + + "(It was %s to used).", + useAppendValuesOracleHint ? "going" : "not going")); + break; + + case ON: + result = true; + LOG.debug(String + .format( + "Use of the APPEND_VALUES Oracle hint has been forced ON. " + + "(It was %s to used).", + useAppendValuesOracleHint ? "going" : "not going")); + break; + + case AUTO: + LOG.debug(String.format("The APPEND_VALUES Oracle hint %s be used.", + result ? "will" : "will not")); + break; + + default: + throw new RuntimeException("Invalid value for APPEND_VALUES."); + } + return result; + } +}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcPartition.java ---------------------------------------------------------------------- diff --git a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcPartition.java b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcPartition.java new file mode 100644 index 0000000..9aeacaf --- /dev/null +++ b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcPartition.java @@ -0,0 +1,183 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.jdbc.oracle; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.sqoop.connector.jdbc.oracle.util.OracleDataChunk; +import org.apache.sqoop.job.etl.Partition; + +public class OracleJdbcPartition extends Partition { + + + private int splitId; + private double totalNumberOfBlocksInAllSplits; + private String splitLocation; + private List<OracleDataChunk> oracleDataChunks; + + // NB: Update write(), readFields() and getDebugDetails() if you add fields + // here. + + public OracleJdbcPartition() { + + this.splitId = -1; + this.splitLocation = ""; + this.oracleDataChunks = new ArrayList<OracleDataChunk>(); + } + + public OracleJdbcPartition(List<OracleDataChunk> dataChunks) { + + setOracleDataChunks(dataChunks); + } + + public void setOracleDataChunks(List<OracleDataChunk> dataChunks) { + + this.oracleDataChunks = dataChunks; + } + + public List<OracleDataChunk> getDataChunks() { + + return this.oracleDataChunks; + } + + public int getNumberOfDataChunks() { + + if (this.getDataChunks() == null) { + return 0; + } else { + return this.getDataChunks().size(); + } + } + + /** + * @return The total number of blocks within the data-chunks of this split + */ + public long getLength() { + + return this.getTotalNumberOfBlocksInThisSplit(); + } + + public int getTotalNumberOfBlocksInThisSplit() { + + if (this.getNumberOfDataChunks() == 0) { + return 0; + } + + int result = 0; + for (OracleDataChunk dataChunk : this.getDataChunks()) { + result += dataChunk.getNumberOfBlocks(); + } + + return result; + } + + public OracleDataChunk findDataChunkById(String id) { + + for (OracleDataChunk dataChunk : this.getDataChunks()) { + if (dataChunk.getId().equals(id)) { + return dataChunk; + } + } + return null; + } + + @Override + /** {@inheritDoc} */ + public void write(DataOutput output) throws IOException { + + output.writeInt(splitId); + + if (this.oracleDataChunks == null) { + output.writeInt(0); + } else { + output.writeInt(this.oracleDataChunks.size()); + for (OracleDataChunk dataChunk : this.oracleDataChunks) { + output.writeUTF(dataChunk.getClass().getName()); + dataChunk.write(output); + } + } + } + + @SuppressWarnings("unchecked") + @Override + /** {@inheritDoc} */ + public void readFields(DataInput input) throws IOException { + + this.splitId = input.readInt(); + + int dataChunkCount = input.readInt(); + if (dataChunkCount == 0) { + this.oracleDataChunks = null; + } else { + Class<? extends OracleDataChunk> dataChunkClass; + OracleDataChunk dataChunk; + this.oracleDataChunks = + new ArrayList<OracleDataChunk>(dataChunkCount); + for (int idx = 0; idx < dataChunkCount; idx++) { + try { + dataChunkClass = + (Class<? extends OracleDataChunk>) Class.forName(input.readUTF()); + dataChunk = dataChunkClass.newInstance(); + } catch (Exception e) { + throw new RuntimeException(e); + } + dataChunk.readFields(input); + this.oracleDataChunks.add(dataChunk); + } + } + } + + public String toString() { + + StringBuilder result = new StringBuilder(); + + if (this.getNumberOfDataChunks() == 0) { + result.append(String.format( + "Split[%s] does not contain any Oracle data-chunks.", this.splitId)); + } else { + result.append(String.format( + "Split[%s] includes the Oracle data-chunks:\n", this.splitId)); + for (OracleDataChunk dataChunk : getDataChunks()) { + result.append(dataChunk.toString()); + } + } + return result.toString(); + } + + protected int getSplitId() { + return this.splitId; + } + + protected void setSplitId(int newSplitId) { + this.splitId = newSplitId; + } + + protected void setSplitLocation(String newSplitLocation) { + this.splitLocation = newSplitLocation; + } + + protected void setTotalNumberOfBlocksInAllSplits( + int newTotalNumberOfBlocksInAllSplits) { + this.totalNumberOfBlocksInAllSplits = newTotalNumberOfBlocksInAllSplits; + } + +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcPartitioner.java ---------------------------------------------------------------------- diff --git a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcPartitioner.java b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcPartitioner.java new file mode 100644 index 0000000..00c7752 --- /dev/null +++ b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcPartitioner.java @@ -0,0 +1,252 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.jdbc.oracle; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.log4j.Logger; +import org.apache.sqoop.connector.jdbc.oracle.configuration.FromJobConfig; +import org.apache.sqoop.connector.jdbc.oracle.configuration.FromJobConfiguration; +import org.apache.sqoop.connector.jdbc.oracle.configuration.LinkConfiguration; +import org.apache.sqoop.connector.jdbc.oracle.util.OracleConnectionFactory; +import org.apache.sqoop.connector.jdbc.oracle.util.OracleDataChunk; +import org.apache.sqoop.connector.jdbc.oracle.util.OracleQueries; +import org.apache.sqoop.connector.jdbc.oracle.util.OracleTable; +import org.apache.sqoop.connector.jdbc.oracle.util.OracleUtilities; +import org.apache.sqoop.job.etl.Partition; +import org.apache.sqoop.job.etl.Partitioner; +import org.apache.sqoop.job.etl.PartitionerContext; + +public class OracleJdbcPartitioner extends + Partitioner<LinkConfiguration, FromJobConfiguration> { + + private static final Logger LOG = + Logger.getLogger(OracleJdbcPartitioner.class); + + @Override + public List<Partition> getPartitions(PartitionerContext context, + LinkConfiguration linkConfiguration, + FromJobConfiguration jobConfiguration) { + try { + Connection connection = OracleConnectionFactory.makeConnection( + linkConfiguration.connectionConfig); + OracleTable table = OracleUtilities.decodeOracleTableName( + linkConfiguration.connectionConfig.username, + jobConfiguration.fromJobConfig.tableName); + + long desiredNumberOfMappers = context.getMaxPartitions(); + List<String> partitionList = getPartitionList( + jobConfiguration.fromJobConfig); + + List<Partition> splits = null; + try { + OracleConnectionFactory.initializeOracleConnection(connection, + linkConfiguration.connectionConfig); + + // The number of chunks generated will *not* be a multiple of the number + // of splits, + // to ensure that each split doesn't always get data from the start of + // each data-file... + long numberOfChunksPerOracleDataFile = (desiredNumberOfMappers * 2) + 1; + + // Get the Oracle data-chunks for the table... + List<? extends OracleDataChunk> dataChunks; + if (OracleUtilities.getOraOopOracleDataChunkMethod( + jobConfiguration.fromJobConfig).equals( + OracleUtilities.OracleDataChunkMethod.PARTITION)) { + dataChunks = + OracleQueries.getOracleDataChunksPartition(connection, table, + partitionList); + } else { + dataChunks = + OracleQueries.getOracleDataChunksExtent(connection, table, + partitionList, numberOfChunksPerOracleDataFile); + } + + if (dataChunks.size() == 0) { + String errMsg; + if (OracleUtilities.getOraOopOracleDataChunkMethod( + jobConfiguration.fromJobConfig).equals( + OracleUtilities.OracleDataChunkMethod.PARTITION)) { + errMsg = + String + .format( + "The table %s does not contain any partitions and you " + + "have specified to chunk the table by partitions.", + table.getName()); + } else { + errMsg = + String.format("The table %s does not contain any data.", table + .getName()); + } + LOG.fatal(errMsg); + throw new RuntimeException(errMsg); + } else { + OracleUtilities.OracleBlockToSplitAllocationMethod + blockAllocationMethod = OracleUtilities + .getOracleBlockToSplitAllocationMethod( + jobConfiguration.fromJobConfig, + OracleUtilities. + OracleBlockToSplitAllocationMethod.ROUNDROBIN); + + // Group the Oracle data-chunks into splits... + splits = + groupTableDataChunksIntoSplits(dataChunks, desiredNumberOfMappers, + blockAllocationMethod); + + /*String oraoopLocations = + jobContext.getConfiguration().get("oraoop.locations", ""); + String[] locations = oraoopLocations.split(","); + for (int idx = 0; idx < locations.length; idx++) { + if (idx < splits.size()) { + String location = locations[idx].trim(); + if (!location.isEmpty()) { + ((OraOopDBInputSplit) splits.get(idx)).setSplitLocation(location); + + LOG.info(String + .format("Split[%d] has been assigned location \"%s\".", idx, + location)); + } + } + }*/ + + } + } catch (SQLException ex) { + throw new RuntimeException(ex); + } + + return splits; + } catch (SQLException ex) { + throw new RuntimeException(String.format( + "Unable to connect to the Oracle database at %s\nError:%s", + linkConfiguration.connectionConfig.connectionString, ex + .getMessage()), ex); + } + } + + private List<String> getPartitionList(FromJobConfig jobConfig) { + LOG.debug("Partition list = " + jobConfig.partitionList); + List<String> result = + OracleUtilities.splitOracleStringList(jobConfig.partitionList); + if (result != null && result.size() > 0) { + LOG.debug("Partition filter list: " + result.toString()); + } + return result; + } + + protected static + List<Partition> groupTableDataChunksIntoSplits( + List<? extends OracleDataChunk> dataChunks, + long desiredNumberOfSplits, + OracleUtilities.OracleBlockToSplitAllocationMethod + blockAllocationMethod) { + + long numberOfDataChunks = dataChunks.size(); + long actualNumberOfSplits = + Math.min(numberOfDataChunks, desiredNumberOfSplits); + long totalNumberOfBlocksInAllDataChunks = 0; + for (OracleDataChunk dataChunk : dataChunks) { + totalNumberOfBlocksInAllDataChunks += dataChunk.getNumberOfBlocks(); + } + + String debugMsg = String.format( + "The table being imported by sqoop has %d blocks " + + "that have been divided into %d chunks " + + "which will be processed in %d splits. " + + "The chunks will be allocated to the splits using the method : %s", + totalNumberOfBlocksInAllDataChunks, numberOfDataChunks, + actualNumberOfSplits, blockAllocationMethod.toString()); + LOG.info(debugMsg); + + List<Partition> splits = + new ArrayList<Partition>((int) actualNumberOfSplits); + + for (int i = 0; i < actualNumberOfSplits; i++) { + OracleJdbcPartition split = new OracleJdbcPartition(); + //split.setSplitId(i); + //split.setTotalNumberOfBlocksInAllSplits( + // totalNumberOfBlocksInAllDataChunks); + splits.add(split); + } + + switch (blockAllocationMethod) { + + case RANDOM: + // Randomize the order of the data chunks and then "fall through" into + // the ROUNDROBIN block below... + Collections.shuffle(dataChunks); + + // NB: No "break;" statement here - we're intentionally falling into the + // ROUNDROBIN block below... + + //$FALL-THROUGH$ + case ROUNDROBIN: + int idxSplitRoundRobin = 0; + for (OracleDataChunk dataChunk : dataChunks) { + + if (idxSplitRoundRobin >= splits.size()) { + idxSplitRoundRobin = 0; + } + OracleJdbcPartition split = + (OracleJdbcPartition) splits.get(idxSplitRoundRobin++); + + split.getDataChunks().add(dataChunk); + } + break; + + case SEQUENTIAL: + double dataChunksPerSplit = dataChunks.size() / (double) splits.size(); + int dataChunksAllocatedToSplits = 0; + + int idxSplitSeq = 0; + for (OracleDataChunk dataChunk : dataChunks) { + + OracleJdbcPartition split = + (OracleJdbcPartition) splits.get(idxSplitSeq); + split.getDataChunks().add(dataChunk); + + dataChunksAllocatedToSplits++; + + if (dataChunksAllocatedToSplits + >= (dataChunksPerSplit * (idxSplitSeq + 1)) + && idxSplitSeq < splits.size()) { + idxSplitSeq++; + } + } + break; + + default: + throw new RuntimeException("Block allocation method not implemented."); + + } + + if (LOG.isDebugEnabled()) { + for (int idx = 0; idx < splits.size(); idx++) { + LOG.debug("\n\t" + + splits.get(idx).toString()); + } + } + + return splits; + } + +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcToDestroyer.java ---------------------------------------------------------------------- diff --git a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcToDestroyer.java b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcToDestroyer.java new file mode 100644 index 0000000..8429a38 --- /dev/null +++ b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcToDestroyer.java @@ -0,0 +1,273 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.jdbc.oracle; + +import java.sql.Connection; +import java.sql.SQLException; + +import org.apache.log4j.Logger; +import org.apache.sqoop.common.ImmutableContext; +import org.apache.sqoop.connector.jdbc.oracle.configuration.ConnectionConfig; +import org.apache.sqoop.connector.jdbc.oracle.configuration.LinkConfiguration; +import org.apache.sqoop.connector.jdbc.oracle.configuration.ToJobConfig; +import org.apache.sqoop.connector.jdbc.oracle.configuration.ToJobConfiguration; +import org.apache.sqoop.connector.jdbc.oracle.util.OracleConnectionFactory; +import org.apache.sqoop.connector.jdbc.oracle.util.OracleQueries; +import org.apache.sqoop.connector.jdbc.oracle.util.OracleQueries.CreateExportChangesTableOptions; +import org.apache.sqoop.connector.jdbc.oracle.util.OracleTable; +import org.apache.sqoop.connector.jdbc.oracle.util.OracleTableColumns; +import org.apache.sqoop.connector.jdbc.oracle.util.OracleUtilities; +import org.apache.sqoop.connector.jdbc.oracle.util.OracleUtilities.ExportTableUpdateTechnique; +import org.apache.sqoop.connector.jdbc.oracle.util.OracleUtilities.InsertMode; +import org.apache.sqoop.connector.jdbc.oracle.util.OracleUtilities.UpdateMode; +import org.apache.sqoop.job.etl.Destroyer; +import org.apache.sqoop.job.etl.DestroyerContext; + +public class OracleJdbcToDestroyer extends Destroyer<LinkConfiguration, ToJobConfiguration> { + + private static final Logger LOG = + Logger.getLogger(OracleJdbcToDestroyer.class); + + protected Connection connection; + protected OracleTable table; + protected int numMappers = 8; + + private void connect(ConnectionConfig connectionConfig) { + try { + connection = OracleConnectionFactory.makeConnection(connectionConfig); + } catch (SQLException ex) { + throw new RuntimeException(String.format( + "Unable to connect to the Oracle database at %s\n" + + "Error:%s", connectionConfig.connectionString, ex + .getMessage()), ex); + } + } + + @Override + public void destroy(DestroyerContext context, + LinkConfiguration linkConfiguration, + ToJobConfiguration jobConfiguration) { + LOG.debug("Running Oracle JDBC connector destroyer"); + + table = OracleUtilities.decodeOracleTableName( + linkConfiguration.connectionConfig.username, + jobConfiguration.toJobConfig.tableName); + + if (jobConfiguration.toJobConfig.updateKey == null || + jobConfiguration.toJobConfig.updateKey.isEmpty()) { + + // Is each mapper inserting rows into a unique table?... + InsertMode insertMode = OracleUtilities.getExportInsertMode( + jobConfiguration.toJobConfig, context.getContext()); + + if(insertMode==InsertMode.ExchangePartition) { + connect(linkConfiguration.connectionConfig); + Object sysDateTime = + OracleUtilities.recallOracleDateTime(context.getContext()); + + exchangePartitionUniqueMapperTableDataIntoMainExportTable(sysDateTime); + + } + + } else { + connect(linkConfiguration.connectionConfig); + Object sysDateTime = + OracleUtilities.recallOracleDateTime(context.getContext()); + try { + updateMainExportTableFromUniqueMapperTable(jobConfiguration.toJobConfig, + context.getContext(), sysDateTime); + } catch(SQLException e) { + throw new RuntimeException( + String.format( + "Unable to update the table %s.",table.toString()), e); + } + } + } + + private void exchangePartitionUniqueMapperTableDataIntoMainExportTable( + Object sysDateTime) { + + for(int i=0; i<numMappers; i++) { + long start = System.nanoTime(); + + OracleTable mapperTable = + OracleUtilities.generateExportTableMapperTableName( + i, sysDateTime, null); + + String subPartitionName = + OracleUtilities.generateExportTableSubPartitionName( + i, sysDateTime); + + try { + OracleQueries.exchangeSubpartition(connection, + table, subPartitionName, mapperTable); + + double timeInSec = (System.nanoTime() - start) / Math.pow(10, 9); + LOG.info(String + .format( + "Time spent performing an \"exchange subpartition with " + + "table\": %f sec.", + timeInSec)); + + LOG.debug(String.format("Dropping temporary mapper table %s", + mapperTable.toString())); + OracleQueries.dropTable(connection, mapperTable); + } catch (SQLException ex) { + throw new RuntimeException( + String + .format( + "Unable to perform an \"exchange subpartition\" operation " + + "for the table %s, for the subpartition named " + + "\"%s\" with the table named \"%s\".", + table.toString(), subPartitionName, + mapperTable.toString()), ex); + } + } + } + + private void updateMainExportTableFromUniqueMapperTable(ToJobConfig jobConfig, + ImmutableContext context, Object sysDateTime) + throws SQLException { + + String[] updateColumnNames = OracleUtilities. + getExportUpdateKeyColumnNames(jobConfig); + + OracleTableColumns tableColumns = OracleQueries.getToTableColumns( + connection, table, true, false); + + UpdateMode updateMode = OracleUtilities.getExportUpdateMode(jobConfig); + + ExportTableUpdateTechnique exportTableUpdateTechnique = + OracleUtilities.getExportTableUpdateTechnique(context, updateMode); + + CreateExportChangesTableOptions changesTableOptions; + boolean parallelizationEnabled = + OracleUtilities.enableOracleParallelProcessingDuringExport(jobConfig); + + switch (exportTableUpdateTechnique) { + + case ReInsertUpdatedRows: + case UpdateSql: + changesTableOptions = + CreateExportChangesTableOptions.OnlyRowsThatDiffer; + break; + + case ReInsertUpdatedRowsAndNewRows: + case MergeSql: + changesTableOptions = + CreateExportChangesTableOptions.RowsThatDifferPlusNewRows; + break; + + default: + throw new RuntimeException(String.format( + "Update %s to cater for the ExportTableUpdateTechnique \"%s\".", + OracleUtilities.getCurrentMethodName(), + exportTableUpdateTechnique.toString())); + } + + String temporaryTableStorageClause = + OracleUtilities.getTemporaryTableStorageClause(jobConfig); + + for(int i=0; i<numMappers; i++) { + + OracleTable mapperTable = + OracleUtilities.generateExportTableMapperTableName( + i, sysDateTime, null); + + OracleTable changesTable = + OracleUtilities.generateExportTableMapperTableName(Integer + .toString(i) + "_CHG", sysDateTime, null); + + try { + int changeTableRowCount = + OracleQueries.createExportChangesTable(connection, + changesTable, temporaryTableStorageClause, mapperTable, + table, updateColumnNames, changesTableOptions, + parallelizationEnabled); + + if (changeTableRowCount == 0) { + LOG.debug(String.format( + "The changes-table does not contain any rows. %s is now exiting.", + OracleUtilities.getCurrentMethodName())); + continue; + } + + switch (exportTableUpdateTechnique) { + + case ReInsertUpdatedRows: + case ReInsertUpdatedRowsAndNewRows: + + OracleQueries.deleteRowsFromTable(connection, + table, changesTable, updateColumnNames, + parallelizationEnabled); + + OracleQueries.insertRowsIntoExportTable(connection, + table, changesTable, sysDateTime, i, + parallelizationEnabled); + break; + + case UpdateSql: + + long start = System.nanoTime(); + + OracleQueries.updateTable(connection, table, + changesTable, updateColumnNames, tableColumns, sysDateTime, i, + parallelizationEnabled); + + double timeInSec = (System.nanoTime() - start) / Math.pow(10, 9); + LOG.info(String.format("Time spent performing an update: %f sec.", + timeInSec)); + break; + + case MergeSql: + + long mergeStart = System.nanoTime(); + + OracleQueries.mergeTable(connection, table, + changesTable, updateColumnNames, tableColumns, sysDateTime, + i, parallelizationEnabled); + + double mergeTimeInSec = (System.nanoTime() - mergeStart) + / Math.pow(10, 9); + LOG.info(String.format("Time spent performing a merge: %f sec.", + mergeTimeInSec)); + + break; + + default: + throw new RuntimeException( + String.format( + "Update %s to cater for the ExportTableUpdateTechnique \"%s\".", + OracleUtilities.getCurrentMethodName(), + exportTableUpdateTechnique.toString())); + } + + connection.commit(); + } catch (SQLException ex) { + connection.rollback(); + throw ex; + } finally { + OracleQueries.dropTable(connection, changesTable); + LOG.debug(String.format("Dropping temporary mapper table %s", + mapperTable.toString())); + OracleQueries.dropTable(connection, mapperTable); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcToInitializer.java ---------------------------------------------------------------------- diff --git a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcToInitializer.java b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcToInitializer.java new file mode 100644 index 0000000..f1d92f0 --- /dev/null +++ b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcToInitializer.java @@ -0,0 +1,498 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.jdbc.oracle; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Date; +import java.util.List; + +import org.apache.commons.lang.BooleanUtils; +import org.apache.log4j.Logger; +import org.apache.sqoop.common.MutableContext; +import org.apache.sqoop.connector.jdbc.oracle.configuration.ConnectionConfig; +import org.apache.sqoop.connector.jdbc.oracle.configuration.LinkConfiguration; +import org.apache.sqoop.connector.jdbc.oracle.configuration.ToJobConfig; +import org.apache.sqoop.connector.jdbc.oracle.configuration.ToJobConfiguration; +import org.apache.sqoop.connector.jdbc.oracle.util.OracleQueries; +import org.apache.sqoop.connector.jdbc.oracle.util.OracleTable; +import org.apache.sqoop.connector.jdbc.oracle.util.OracleTablePartition; +import org.apache.sqoop.connector.jdbc.oracle.util.OracleTablePartitions; +import org.apache.sqoop.connector.jdbc.oracle.util.OracleUtilities; +import org.apache.sqoop.connector.jdbc.oracle.util.OracleUtilities.UpdateMode; +import org.apache.sqoop.job.etl.InitializerContext; + +public class OracleJdbcToInitializer extends + OracleJdbcCommonInitializer<ToJobConfiguration> { + + private static final Logger LOG = + Logger.getLogger(OracleJdbcToInitializer.class); + + @Override + public void connect(InitializerContext context, + LinkConfiguration linkConfiguration, ToJobConfiguration jobConfiguration) + throws SQLException { + super.connect(context, linkConfiguration, jobConfiguration); + table = OracleUtilities.decodeOracleTableName( + linkConfiguration.connectionConfig.username, + jobConfiguration.toJobConfig.tableName); + } + + @Override + public void initialize(InitializerContext context, + LinkConfiguration linkConfiguration, + ToJobConfiguration jobConfiguration) { + super.initialize(context, linkConfiguration, jobConfiguration); + LOG.debug("Running Oracle JDBC connector initializer"); + try { + createAnyRequiredOracleObjects(context.getContext(), + jobConfiguration.toJobConfig, linkConfiguration.connectionConfig); + + if (!isSqoopTableAnOracleTable(connection, + linkConfiguration.connectionConfig.username, table)) { + throw new RuntimeException("Can only load data into Oracle tables."); + } + } catch(SQLException ex) { + throw new RuntimeException(ex); + } + } + + private void createAnyRequiredOracleObjects(MutableContext context, + ToJobConfig jobConfig, ConnectionConfig connectionConfig) + throws SQLException { + + // The SYSDATE on the Oracle database will be used as the partition value + // for this export job... + Object sysDateTime = OracleQueries.getSysDate(connection); + String sysDateStr = + OracleQueries.oraDATEToString(sysDateTime, "yyyy-mm-dd hh24:mi:ss"); + context.setString(OracleJdbcConnectorConstants.SQOOP_ORACLE_JOB_SYSDATE, + sysDateStr); + + checkForOldOraOopTemporaryOracleTables(connection, sysDateTime, + OracleQueries.getCurrentSchema(connection)); + + // Store the actual partition value, so the N mappers know what value to + // insert... + String partitionValue = + OracleQueries.oraDATEToString(sysDateTime, + OracleJdbcConnectorConstants.ORAOOP_EXPORT_PARTITION_DATE_FORMAT); + context.setString( + OracleJdbcConnectorConstants.ORAOOP_EXPORT_PARTITION_DATE_VALUE, + partitionValue); + + // Generate the (22 character) partition name... + String partitionName = + OracleUtilities + .createExportTablePartitionNameFromOracleTimestamp(sysDateTime); + + //TODO: Number of mappers needs to be fixed + int numMappers = 8; + + String exportTableTemplate = jobConfig.templateTable; + + if(exportTableTemplate==null) { + exportTableTemplate = ""; + } + + String user = connectionConfig.username; + //TODO: This is from the other Oracle Manager + //if (user == null) { + // user = OracleManager.getSessionUser(connection); + //} + + OracleTable templateTableContext = + OracleUtilities.decodeOracleTableName(user, exportTableTemplate); + + boolean noLoggingOnNewTable = BooleanUtils.isTrue(jobConfig.nologging); + + String updateKeyCol = jobConfig.updateKey; + + /* =========================== */ + /* VALIDATION OF INPUTS */ + /* =========================== */ + + if (updateKeyCol == null || updateKeyCol.isEmpty()) { + // We're performing an "insert" export, not an "update" export. + + // Check that the "oraoop.export.merge" property has not been specified, + // as this would be + // an invalid scenario... + if (OracleUtilities.getExportUpdateMode(jobConfig) == UpdateMode.Merge) { + throw new RuntimeException("The merge option can only be used if " + + "an update key is specified."); + } + } + + if (OracleUtilities + .userWantsToCreatePartitionedExportTableFromTemplate(jobConfig) + || OracleUtilities + .userWantsToCreateNonPartitionedExportTableFromTemplate(jobConfig)) { + + // OraOop will create the export table. + + if (table.getName().length() + > OracleJdbcConnectorConstants.Oracle.MAX_IDENTIFIER_LENGTH) { + String msg = + String.format( + "The Oracle table name \"%s\" is longer than %d characters.\n" + + "Oracle will not allow a table with this name to be created.", + table.getName(), + OracleJdbcConnectorConstants.Oracle.MAX_IDENTIFIER_LENGTH); + throw new RuntimeException(msg); + } + + if (updateKeyCol != null && !updateKeyCol.isEmpty()) { + + // We're performing an "update" export, not an "insert" export. + + // Check whether the user is attempting an "update" (i.e. a non-merge). + // If so, they're + // asking to only UPDATE rows in a (about to be created) (empty) table + // that contains no rows. + // This will be a waste of time, as we'd be attempting to perform UPDATE + // operations against a + // table with no rows in it... + UpdateMode updateMode = OracleUtilities.getExportUpdateMode(jobConfig); + if (updateMode == UpdateMode.Update) { + throw new RuntimeException(String.format( + "\n\nCombining the template table option with the merge " + + "option is nonsensical, as this would create an " + + "empty table and then perform " + + "a lot of work that results in a table containing no rows.\n")); + } + } + + // Check that the specified template table actually exists and is a + // table... + String templateTableObjectType = + OracleQueries.getOracleObjectType(connection, + templateTableContext); + if (templateTableObjectType == null) { + throw new RuntimeException(String.format( + "The specified Oracle template table \"%s\" does not exist.", + templateTableContext.toString())); + } + + if (!templateTableObjectType.equalsIgnoreCase( + OracleJdbcConnectorConstants.Oracle.OBJECT_TYPE_TABLE)) { + throw new RuntimeException( + String.format( + "The specified Oracle template table \"%s\" is not an " + + "Oracle table, it's a %s.", + templateTableContext.toString(), templateTableObjectType)); + } + + if (BooleanUtils.isTrue(jobConfig.dropTableIfExists)) { + OracleQueries.dropTable(connection, table); + } + + // Check that there is no existing database object with the same name of + // the table to be created... + String newTableObjectType = + OracleQueries.getOracleObjectType(connection, table); + if (newTableObjectType != null) { + throw new RuntimeException( + String.format( + "%s cannot create a new Oracle table named %s as a \"%s\" " + + "with this name already exists.", + OracleJdbcConnectorConstants.CONNECTOR_NAME, table.toString(), + newTableObjectType)); + } + } else { + // The export table already exists. + + if (updateKeyCol != null && !updateKeyCol.isEmpty()) { + + // We're performing an "update" export, not an "insert" export. + + // Check that there exists an index on the export table on the + // update-key column(s). + // Without such an index, this export may perform like a real dog... + String[] updateKeyColumns = + OracleUtilities.getExportUpdateKeyColumnNames(jobConfig); + if (!OracleQueries.doesIndexOnColumnsExist(connection, + table, updateKeyColumns)) { + String msg = String.format( + "\n**************************************************************" + + "***************************************************************" + + "\n\tThe table %1$s does not have a valid index on " + + "the column(s) %2$s.\n" + + "\tAs a consequence, this export may take a long time to " + + "complete.\n" + + "\tIf performance is unacceptable, consider reattempting this " + + "job after creating an index " + + "on this table via the SQL...\n" + + "\t\tcreate index <index_name> on %1$s(%2$s);\n" + + "****************************************************************" + + "*************************************************************", + table.toString(), + OracleUtilities.stringArrayToCSV(updateKeyColumns)); + LOG.warn(msg); + } + } + } + + boolean createMapperTables = false; + + if (updateKeyCol != null && !updateKeyCol.isEmpty()) { + createMapperTables = true; + } + + if (OracleUtilities + .userWantsToCreatePartitionedExportTableFromTemplate(jobConfig)) { + /* ================================= */ + /* CREATE A PARTITIONED TABLE */ + /* ================================= */ + + // Create a new Oracle table using the specified template... + + String[] subPartitionNames = + OracleUtilities.generateExportTableSubPartitionNames(numMappers, + sysDateTime); + // Create the export table from a template table... + String tableStorageClause = + OracleUtilities.getExportTableStorageClause(jobConfig); + + OracleQueries.createExportTableFromTemplateWithPartitioning( + connection, table, + tableStorageClause, templateTableContext, noLoggingOnNewTable, + partitionName, sysDateTime, numMappers, + subPartitionNames); + + createMapperTables = true; + } else if (OracleUtilities + .userWantsToCreateNonPartitionedExportTableFromTemplate(jobConfig)) { + /* ===================================== */ + /* CREATE A NON-PARTITIONED TABLE */ + /* ===================================== */ + String tableStorageClause = + OracleUtilities.getExportTableStorageClause(jobConfig); + + OracleQueries.createExportTableFromTemplate(connection, + table, tableStorageClause, + templateTableContext, noLoggingOnNewTable); + } else { + /* ===================================================== */ + /* ADD ADDITIONAL PARTITIONS TO AN EXISTING TABLE */ + /* ===================================================== */ + + // If the export table is partitioned, and the partitions were created by + // OraOop, then we need + // create additional partitions... + + OracleTablePartitions tablePartitions = + OracleQueries.getPartitions(connection, table); + // Find any partition name starting with "ORAOOP_"... + OracleTablePartition oraOopPartition = + tablePartitions.findPartitionByRegEx("^" + + OracleJdbcConnectorConstants. + EXPORT_TABLE_PARTITION_NAME_PREFIX); + + if (tablePartitions.size() > 0 && oraOopPartition == null) { + + for (int idx = 0; idx < tablePartitions.size(); idx++) { + LOG.info(String.format( + "The Oracle table %s has a partition named \"%s\".", + table.toString(), + tablePartitions.get(idx).getName())); + } + + LOG.warn(String.format( + "The Oracle table %s is partitioned.\n" + + "These partitions were not created by %s.", + table.toString(), + OracleJdbcConnectorConstants.CONNECTOR_NAME)); + } + + if (oraOopPartition != null) { + + // Indicate in the configuration what's happening... + context.setBoolean(OracleJdbcConnectorConstants. + EXPORT_TABLE_HAS_SQOOP_PARTITIONS, true); + + LOG.info(String.format( + "The Oracle table %s is partitioned.\n" + + "These partitions were created by %s, so " + + "additional partitions will now be created.\n" + + "The name of the new partition will be \"%s\".", + table.toString(), OracleJdbcConnectorConstants. + CONNECTOR_NAME, partitionName)); + + String[] subPartitionNames = + OracleUtilities.generateExportTableSubPartitionNames(numMappers, + sysDateTime); + + // Add another partition (and N subpartitions) to this existing, + // partitioned export table... + OracleQueries.createMoreExportTablePartitions(connection, + table, partitionName, + sysDateTime, subPartitionNames); + + createMapperTables = true; + } + } + + if(createMapperTables) { + createUniqueMapperTable(sysDateTime, numMappers, jobConfig); + } + } + + private void createUniqueMapperTable(Object sysDateTime, + int numMappers, ToJobConfig jobConfig) + throws SQLException { + + // Mappers insert data into a unique table before either: + // - exchanging it into a subpartition of the 'real' export table; or + // - merging it into the 'real' export table. + + for (int i=0; i<numMappers; i++) { + OracleTable mapperTable = + OracleUtilities.generateExportTableMapperTableName(i, + sysDateTime, null); + + // If this mapper is being reattempted in response to a failure, we need + // to delete the + // temporary table created by the previous attempt... + OracleQueries.dropTable(connection, mapperTable); + + String temporaryTableStorageClause = + OracleUtilities.getTemporaryTableStorageClause(jobConfig); + + OracleQueries.createExportTableForMapper(connection, + mapperTable, temporaryTableStorageClause, table + , false); // <- addOraOopPartitionColumns + + LOG.debug(String.format("Created temporary mapper table %s", mapperTable + .toString())); + } + } + + private void checkForOldOraOopTemporaryOracleTables(Connection connection, + Object sysDateTime, String schema) { + + try { + + StringBuilder message = new StringBuilder(); + message + .append(String.format( + "The following tables appear to be old temporary tables created by " + + "%s that have not been deleted.\n" + + "They are probably left over from jobs that encountered an error and " + + "could not clean up after themselves.\n" + + "You might want to drop these Oracle tables in order to reclaim " + + "Oracle storage space:\n", + OracleJdbcConnectorConstants.CONNECTOR_NAME)); + boolean showMessage = false; + + String generatedTableName = + OracleUtilities.generateExportTableMapperTableName(0, sysDateTime, + schema).getName(); + generatedTableName = generatedTableName.replaceAll("[0-9]", "%"); + generatedTableName = + OracleUtilities.replaceAll(generatedTableName, "%%", "%"); + Date sysDate = OracleQueries.oraDATEToDate(sysDateTime); + + List<OracleTable> tables = + OracleQueries.getTablesWithTableNameLike(connection, schema, + generatedTableName); + + for (OracleTable oracleTable : tables) { + OracleUtilities.DecodedExportMapperTableName tableName = + OracleUtilities.decodeExportTableMapperTableName(oracleTable); + if (tableName != null) { + Date tableDate = + OracleQueries.oraDATEToDate(tableName.getTableDateTime()); + double daysApart = + (sysDate.getTime() - tableDate.getTime()) / (1000 * 60 * 60 * 24); + if (daysApart > 1.0) { + showMessage = true; + message.append(String.format("\t%s\n", oracleTable.toString())); + } + } + } + + if (showMessage) { + LOG.info(message.toString()); + } + } catch (Exception ex) { + LOG.warn(String.format( + "%s was unable to check for the existance of old " + + "temporary Oracle tables.\n" + "Error:\n%s", + OracleJdbcConnectorConstants.CONNECTOR_NAME, ex.toString())); + } + } + + private boolean isSqoopTableAnOracleTable(Connection connection, + String connectionUserName, OracleTable tableContext) { + + String oracleObjectType; + + try { + + // Find the table via dba_tables... + OracleTable oracleTable = + OracleQueries.getTable(connection, tableContext.getSchema(), + tableContext.getName()); + if (oracleTable != null) { + return true; + } + + // If we could not find the table via dba_tables, then try and determine + // what type of database object the + // user was referring to. Perhaps they've specified the name of a view?... + oracleObjectType = + OracleQueries.getOracleObjectType(connection, tableContext); + + if (oracleObjectType == null) { + LOG.info(String.format( + "%1$s will not process this Sqoop connection, " + + "as the Oracle user %2$s does not own a table named %3$s.\n" + + "\tPlease prefix the table name with the owner.\n " + + "\tNote: You may need to double-quote the owner and/or table name." + + "\n\tE.g. sqoop ... --username %4$s --table %2$s.%3$s\n", + OracleJdbcConnectorConstants.CONNECTOR_NAME, tableContext.getSchema(), + tableContext.getName(), connectionUserName)); + return false; + } + + } catch (SQLException ex) { + LOG.warn(String.format( + "Unable to determine the Oracle-type of the object named %s owned by " + + "%s.\nError:\n" + "%s", tableContext.getName(), tableContext + .getSchema(), ex.getMessage())); + + // In the absence of conflicting information, let's assume the object is + // actually a table... + return true; + } + + boolean result = + oracleObjectType.equalsIgnoreCase( + OracleJdbcConnectorConstants.Oracle.OBJECT_TYPE_TABLE); + + if (!result) { + LOG.info(String.format("%s will not process this sqoop connection, " + + "as %s is not an Oracle table, it's a %s.", + OracleJdbcConnectorConstants.CONNECTOR_NAME, tableContext.toString(), + oracleObjectType)); + } + + return result; + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/ConnectionConfig.java ---------------------------------------------------------------------- diff --git a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/ConnectionConfig.java b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/ConnectionConfig.java new file mode 100644 index 0000000..c355a77 --- /dev/null +++ b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/ConnectionConfig.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.jdbc.oracle.configuration; + +import org.apache.sqoop.connector.jdbc.oracle.util.OracleConnectionFactory; +import org.apache.sqoop.model.ConfigClass; +import org.apache.sqoop.model.Input; +import org.apache.sqoop.model.Validator; +import org.apache.sqoop.validation.Status; +import org.apache.sqoop.validation.validators.AbstractValidator; +import org.apache.sqoop.validation.validators.StartsWith; + +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Map; + +/** + * + */ +@ConfigClass(validators = {@Validator(ConnectionConfig.ConfigValidator.class)}) +public class ConnectionConfig { + @Input(size = 128, validators = {@Validator(value = StartsWith.class, strArg = "jdbc:")} ) + public String connectionString; + + @Input(size = 40) + public String username; + + @Input(size = 40, sensitive = true) + public String password; + + @Input + public Map<String, String> jdbcProperties; + + @Input + public String timeZone; + + @Input + public String actionName; + + @Input + public Integer fetchSize; + + @Input + public String initializationStatements; + + @Input + public Boolean jdbcUrlVerbatim; + + @Input + public String racServiceName; + + public static class ConfigValidator extends AbstractValidator<ConnectionConfig> { + @Override + public void validate(ConnectionConfig linkConfig) { + // See if we can connect to the database + try { + OracleConnectionFactory.makeConnection(linkConfig); + } catch (SQLException e) { + addMessage(Status.WARNING, "Can't connect to the database with given credentials: " + e.getMessage()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/FromJobConfig.java ---------------------------------------------------------------------- diff --git a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/FromJobConfig.java b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/FromJobConfig.java new file mode 100644 index 0000000..38c808f --- /dev/null +++ b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/FromJobConfig.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.jdbc.oracle.configuration; + +import org.apache.sqoop.model.ConfigClass; +import org.apache.sqoop.model.Input; +import org.apache.sqoop.model.Validator; +import org.apache.sqoop.validation.validators.NotEmpty; + +/** + * + */ +@ConfigClass +public class FromJobConfig { + + @Input(size = 2000, validators = { @Validator(NotEmpty.class)}) + public String tableName; + + @Input + public Boolean consistentRead; + + @Input + public Long consistentReadScn; + + @Input(size = 2000) + public String partitionList; + + @Input(size = 2000) + public String dataChunkMethod; + + @Input(size = 2000) + public String dataChunkAllocationMethod; + + @Input(size = 2000) + public String whereClauseLocation; + + @Input + public Boolean omitLobColumns; + + @Input + public String queryHint; + + @Input(size = 2000) + public String conditions; + +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/FromJobConfiguration.java ---------------------------------------------------------------------- diff --git a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/FromJobConfiguration.java b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/FromJobConfiguration.java new file mode 100644 index 0000000..6a6c1aa --- /dev/null +++ b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/FromJobConfiguration.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.jdbc.oracle.configuration; + +import org.apache.sqoop.model.ConfigurationClass; +import org.apache.sqoop.model.Config; + +/** + * + */ +@ConfigurationClass +public class FromJobConfiguration { + @Config public FromJobConfig fromJobConfig; + + public FromJobConfiguration() { + fromJobConfig = new FromJobConfig(); + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/LinkConfiguration.java ---------------------------------------------------------------------- diff --git a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/LinkConfiguration.java b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/LinkConfiguration.java new file mode 100644 index 0000000..990343b --- /dev/null +++ b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/LinkConfiguration.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.jdbc.oracle.configuration; + +import org.apache.sqoop.model.ConfigurationClass; +import org.apache.sqoop.model.Config; + +/** + * + */ +@ConfigurationClass +public class LinkConfiguration { + + @Config public ConnectionConfig connectionConfig; + + public LinkConfiguration() { + connectionConfig = new ConnectionConfig(); + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/ToJobConfig.java ---------------------------------------------------------------------- diff --git a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/ToJobConfig.java b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/ToJobConfig.java new file mode 100644 index 0000000..939a87a --- /dev/null +++ b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/ToJobConfig.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.jdbc.oracle.configuration; + +import org.apache.sqoop.model.ConfigClass; +import org.apache.sqoop.model.Input; +import org.apache.sqoop.model.Validator; +import org.apache.sqoop.validation.validators.NotEmpty; + +/** + * + */ +@ConfigClass +public class ToJobConfig { + + @Input(size = 2000, validators = { @Validator(NotEmpty.class)}) + public String tableName; + + @Input(size = 2000) + public String templateTable; + + @Input + public Boolean partitioned; + + @Input + public Boolean nologging; + + @Input(size = 2000) + public String updateKey; + + @Input + public Boolean updateMerge; + + @Input + public Boolean dropTableIfExists; + + @Input(size = 2000) + public String storageClause; + + @Input(size = 2000) + public String temporaryStorageClause; + + @Input(size = 2000) + public String appendValuesHint; + + @Input + public Boolean parallel; + +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/ToJobConfiguration.java ---------------------------------------------------------------------- diff --git a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/ToJobConfiguration.java b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/ToJobConfiguration.java new file mode 100644 index 0000000..b34df1a --- /dev/null +++ b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/ToJobConfiguration.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.jdbc.oracle.configuration; + +import org.apache.sqoop.model.ConfigurationClass; +import org.apache.sqoop.model.Config; + +/** + * + */ +@ConfigurationClass +public class ToJobConfiguration { + @Config public ToJobConfig toJobConfig; + + public ToJobConfiguration() { + toJobConfig = new ToJobConfig(); + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/util/OracleActiveInstance.java ---------------------------------------------------------------------- diff --git a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/util/OracleActiveInstance.java b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/util/OracleActiveInstance.java new file mode 100644 index 0000000..b46bce5 --- /dev/null +++ b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/util/OracleActiveInstance.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.connector.jdbc.oracle.util; + +/** + * Wraps data from v$active_instances. + */ +public class OracleActiveInstance { + + private String instanceName; + private String hostName; + + public String getInstanceName() { + return instanceName; + } + + public void setInstanceName(String newInstanceName) { + this.instanceName = newInstanceName; + } + + public String getHostName() { + return hostName; + } + + public void setHostName(String newHostName) { + this.hostName = newHostName; + } +}
