http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcToInitializer.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcToInitializer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcToInitializer.java new file mode 100644 index 0000000..816821e --- /dev/null +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcToInitializer.java @@ -0,0 +1,222 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.jdbc; + +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.util.LinkedList; +import java.util.List; + +import org.apache.commons.lang.StringUtils; +import org.apache.log4j.Logger; +import org.apache.sqoop.common.MutableContext; +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration; +import org.apache.sqoop.connector.jdbc.configuration.ToJobConfiguration; +import org.apache.sqoop.connector.jdbc.util.SqlTypesUtils; +import org.apache.sqoop.job.etl.Initializer; +import org.apache.sqoop.job.etl.InitializerContext; +import org.apache.sqoop.schema.Schema; +import org.apache.sqoop.schema.type.Column; +import org.apache.sqoop.utils.ClassUtils; + +public class GenericJdbcToInitializer extends Initializer<ConnectionConfiguration, ToJobConfiguration> { + + private GenericJdbcExecutor executor; + private static final Logger LOG = + Logger.getLogger(GenericJdbcToInitializer.class); + + @Override + public void initialize(InitializerContext context, ConnectionConfiguration connection, ToJobConfiguration job) { + configureJdbcProperties(context.getContext(), connection, job); + try { + configureTableProperties(context.getContext(), connection, job); + } finally { + executor.close(); + } + } + + @Override + public List<String> getJars(InitializerContext context, ConnectionConfiguration connection, ToJobConfiguration job) { + List<String> jars = new LinkedList<String>(); + + jars.add(ClassUtils.jarForClass(connection.connection.jdbcDriver)); + + return jars; + } + + @Override + public Schema getSchema(InitializerContext context, ConnectionConfiguration connectionConfiguration, ToJobConfiguration toJobConfiguration) { + configureJdbcProperties(context.getContext(), connectionConfiguration, toJobConfiguration); + + String schemaName = toJobConfiguration.table.tableName; + + if (schemaName == null) { + throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0019, + "Table name extraction not supported yet."); + } + + if(toJobConfiguration.table.schemaName != null) { + schemaName = toJobConfiguration.table.schemaName + "." + schemaName; + } + + Schema schema = new Schema(schemaName); + ResultSet rs = null; + ResultSetMetaData rsmt = null; + try { + rs = executor.executeQuery("SELECT * FROM " + schemaName + " WHERE 1 = 0"); + + rsmt = rs.getMetaData(); + for (int i = 1 ; i <= rsmt.getColumnCount(); i++) { + Column column = SqlTypesUtils.sqlTypeToAbstractType(rsmt.getColumnType(i)); + + String columnName = rsmt.getColumnName(i); + if (columnName == null || columnName.equals("")) { + columnName = rsmt.getColumnLabel(i); + if (null == columnName) { + columnName = "Column " + i; + } + } + + column.setName(columnName); + schema.addColumn(column); + } + + return schema; + } catch (SQLException e) { + throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0016, e); + } finally { + if(rs != null) { + try { + rs.close(); + } catch (SQLException e) { + LOG.info("Ignoring exception while closing ResultSet", e); + } + } + } + } + + private void configureJdbcProperties(MutableContext context, ConnectionConfiguration connectionConfig, ToJobConfiguration jobConfig) { + String driver = connectionConfig.connection.jdbcDriver; + String url = connectionConfig.connection.connectionString; + String username = connectionConfig.connection.username; + String password = connectionConfig.connection.password; + + assert driver != null; + assert url != null; + + executor = new GenericJdbcExecutor(driver, url, username, password); + } + + private void configureTableProperties(MutableContext context, ConnectionConfiguration connectionConfig, ToJobConfiguration jobConfig) { + String dataSql; + + String schemaName = jobConfig.table.schemaName; + String tableName = jobConfig.table.tableName; + String stageTableName = jobConfig.table.stageTableName; + boolean clearStageTable = jobConfig.table.clearStageTable == null ? + false : jobConfig.table.clearStageTable; + final boolean stageEnabled = + stageTableName != null && stageTableName.length() > 0; + String tableSql = jobConfig.table.sql; + String tableColumns = jobConfig.table.columns; + + if (tableName != null && tableSql != null) { + // when both table name and table sql are specified: + throw new SqoopException( + GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0007); + + } else if (tableName != null) { + // when table name is specified: + if(stageEnabled) { + LOG.info("Stage has been enabled."); + LOG.info("Use stageTable: " + stageTableName + + " with clearStageTable: " + clearStageTable); + + if(clearStageTable) { + executor.deleteTableData(stageTableName); + } else { + long stageRowCount = executor.getTableRowCount(stageTableName); + if(stageRowCount > 0) { + throw new SqoopException( + GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0017); + } + } + } + + // For databases that support schemas (IE: postgresql). + final String tableInUse = stageEnabled ? stageTableName : tableName; + String fullTableName = (schemaName == null) ? + executor.delimitIdentifier(tableInUse) : + executor.delimitIdentifier(schemaName) + + "." + executor.delimitIdentifier(tableInUse); + + if (tableColumns == null) { + String[] columns = executor.getQueryColumns("SELECT * FROM " + + fullTableName + " WHERE 1 = 0"); + StringBuilder builder = new StringBuilder(); + builder.append("INSERT INTO "); + builder.append(fullTableName); + builder.append(" VALUES (?"); + for (int i = 1; i < columns.length; i++) { + builder.append(",?"); + } + builder.append(")"); + dataSql = builder.toString(); + + } else { + String[] columns = StringUtils.split(tableColumns, ','); + StringBuilder builder = new StringBuilder(); + builder.append("INSERT INTO "); + builder.append(fullTableName); + builder.append(" ("); + builder.append(tableColumns); + builder.append(") VALUES (?"); + for (int i = 1; i < columns.length; i++) { + builder.append(",?"); + } + builder.append(")"); + dataSql = builder.toString(); + } + } else if (tableSql != null) { + // when table sql is specified: + + if (tableSql.indexOf( + GenericJdbcConnectorConstants.SQL_PARAMETER_MARKER) == -1) { + // make sure parameter marker is in the specified sql + throw new SqoopException( + GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0013); + } + + if (tableColumns == null) { + dataSql = tableSql; + } else { + throw new SqoopException( + GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0014); + } + } else { + // when neither are specified: + throw new SqoopException( + GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0008); + } + + context.setString(GenericJdbcConnectorConstants.CONNECTOR_TO_JDBC_DATA_SQL, + dataSql); + } +}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcValidator.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcValidator.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcValidator.java index 0c5f6e1..92f70e2 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcValidator.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcValidator.java @@ -18,9 +18,8 @@ package org.apache.sqoop.connector.jdbc; import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration; -import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration; -import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration; -import org.apache.sqoop.model.MJob; +import org.apache.sqoop.connector.jdbc.configuration.FromJobConfiguration; +import org.apache.sqoop.connector.jdbc.configuration.ToJobConfiguration; import org.apache.sqoop.validation.Status; import org.apache.sqoop.validation.Validation; import org.apache.sqoop.validation.Validator; @@ -67,20 +66,13 @@ public class GenericJdbcValidator extends Validator { } @Override - public Validation validateJob(MJob.Type type, Object jobConfiguration) { - switch(type) { - case IMPORT: - return validateImportJob(jobConfiguration); - case EXPORT: - return validateExportJob(jobConfiguration); - default: - return super.validateJob(type, jobConfiguration); - } + public Validation validateJob(Object jobConfiguration) { + return super.validateJob(jobConfiguration); } private Validation validateExportJob(Object jobConfiguration) { - Validation validation = new Validation(ExportJobConfiguration.class); - ExportJobConfiguration configuration = (ExportJobConfiguration)jobConfiguration; + Validation validation = new Validation(ToJobConfiguration.class); + ToJobConfiguration configuration = (ToJobConfiguration)jobConfiguration; if(configuration.table.tableName == null && configuration.table.sql == null) { validation.addMessage(Status.UNACCEPTABLE, "table", "Either table name or SQL must be specified"); @@ -104,8 +96,8 @@ public class GenericJdbcValidator extends Validator { } private Validation validateImportJob(Object jobConfiguration) { - Validation validation = new Validation(ImportJobConfiguration.class); - ImportJobConfiguration configuration = (ImportJobConfiguration)jobConfiguration; + Validation validation = new Validation(FromJobConfiguration.class); + FromJobConfiguration configuration = (FromJobConfiguration)jobConfiguration; if(configuration.table.tableName == null && configuration.table.sql == null) { validation.addMessage(Status.UNACCEPTABLE, "table", "Either table name or SQL must be specified"); http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ExportJobConfiguration.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ExportJobConfiguration.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ExportJobConfiguration.java deleted file mode 100644 index f2b2d65..0000000 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ExportJobConfiguration.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.connector.jdbc.configuration; - -import org.apache.sqoop.model.ConfigurationClass; -import org.apache.sqoop.model.Form; - -/** - * - */ -@ConfigurationClass -public class ExportJobConfiguration { - @Form public ExportTableForm table; - - public ExportJobConfiguration() { - table = new ExportTableForm(); - } -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ExportTableForm.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ExportTableForm.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ExportTableForm.java deleted file mode 100644 index 14a7033..0000000 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ExportTableForm.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.connector.jdbc.configuration; - -import org.apache.sqoop.model.FormClass; -import org.apache.sqoop.model.Input; - -/** - * - */ -@FormClass -public class ExportTableForm { - @Input(size = 50) public String schemaName; - @Input(size = 2000) public String tableName; - @Input(size = 50) public String sql; - @Input(size = 50) public String columns; - @Input(size = 2000) public String stageTableName; - @Input public Boolean clearStageTable; -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/FromJobConfiguration.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/FromJobConfiguration.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/FromJobConfiguration.java new file mode 100644 index 0000000..bd1c4be --- /dev/null +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/FromJobConfiguration.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.jdbc.configuration; + +import org.apache.sqoop.model.ConfigurationClass; +import org.apache.sqoop.model.Form; + +/** + * + */ +@ConfigurationClass +public class FromJobConfiguration { + @Form public FromTableForm table; + + public FromJobConfiguration() { + table = new FromTableForm(); + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/FromTableForm.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/FromTableForm.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/FromTableForm.java new file mode 100644 index 0000000..8f6fb60 --- /dev/null +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/FromTableForm.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.jdbc.configuration; + +import org.apache.sqoop.model.FormClass; +import org.apache.sqoop.model.Input; + +/** + * + */ +@FormClass +public class FromTableForm { + @Input(size = 50) public String schemaName; + @Input(size = 50) public String tableName; + @Input(size = 2000) public String sql; + @Input(size = 50) public String columns; + @Input(size = 50) public String partitionColumn; + @Input public Boolean partitionColumnNull; + @Input(size = 50) public String boundaryQuery; +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ImportJobConfiguration.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ImportJobConfiguration.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ImportJobConfiguration.java deleted file mode 100644 index f3c1d13..0000000 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ImportJobConfiguration.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.connector.jdbc.configuration; - -import org.apache.sqoop.model.ConfigurationClass; -import org.apache.sqoop.model.Form; - -/** - * - */ -@ConfigurationClass -public class ImportJobConfiguration { - @Form public ImportTableForm table; - - public ImportJobConfiguration() { - table = new ImportTableForm(); - } -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ImportTableForm.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ImportTableForm.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ImportTableForm.java deleted file mode 100644 index 0991b28..0000000 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ImportTableForm.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.connector.jdbc.configuration; - -import org.apache.sqoop.model.FormClass; -import org.apache.sqoop.model.Input; - -/** - * - */ -@FormClass -public class ImportTableForm { - @Input(size = 50) public String schemaName; - @Input(size = 50) public String tableName; - @Input(size = 2000) public String sql; - @Input(size = 50) public String columns; - @Input(size = 50) public String partitionColumn; - @Input public Boolean partitionColumnNull; - @Input(size = 50) public String boundaryQuery; -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ToJobConfiguration.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ToJobConfiguration.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ToJobConfiguration.java new file mode 100644 index 0000000..a0f837e --- /dev/null +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ToJobConfiguration.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.jdbc.configuration; + +import org.apache.sqoop.model.ConfigurationClass; +import org.apache.sqoop.model.Form; + +/** + * + */ +@ConfigurationClass +public class ToJobConfiguration { + @Form public ToTableForm table; + + public ToJobConfiguration() { + table = new ToTableForm(); + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ToTableForm.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ToTableForm.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ToTableForm.java new file mode 100644 index 0000000..dca0bf9 --- /dev/null +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ToTableForm.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.jdbc.configuration; + +import org.apache.sqoop.model.FormClass; +import org.apache.sqoop.model.Input; + +/** + * + */ +@FormClass +public class ToTableForm { + @Input(size = 50) public String schemaName; + @Input(size = 2000) public String tableName; + @Input(size = 50) public String sql; + @Input(size = 50) public String columns; + @Input(size = 2000) public String stageTableName; + @Input public Boolean clearStageTable; +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java index 3c5ca39..73106ab 100644 --- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java +++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java @@ -22,7 +22,7 @@ import org.apache.sqoop.common.MutableContext; import org.apache.sqoop.common.MutableMapContext; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration; -import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration; +//import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration; import org.apache.sqoop.job.etl.Initializer; import org.apache.sqoop.job.etl.InitializerContext; import org.apache.sqoop.model.MJob; http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java index 5b7a1e3..420e3ad 100644 --- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java +++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java @@ -27,7 +27,7 @@ import java.util.Collection; import org.apache.sqoop.common.MutableContext; import org.apache.sqoop.common.MutableMapContext; import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration; -import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration; +//import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration; import org.apache.sqoop.etl.io.DataReader; import org.apache.sqoop.job.etl.Loader; import org.apache.sqoop.job.etl.LoaderContext; http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java index 9130375..8ded5a4 100644 --- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java +++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java @@ -22,7 +22,7 @@ import junit.framework.TestCase; import org.apache.sqoop.common.MutableContext; import org.apache.sqoop.common.MutableMapContext; import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration; -import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration; +//import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration; import org.apache.sqoop.job.etl.Extractor; import org.apache.sqoop.job.etl.ExtractorContext; import org.apache.sqoop.etl.io.DataWriter; http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java index 15c38aa..c5eb852 100644 --- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java +++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java @@ -24,7 +24,7 @@ import junit.framework.TestCase; import org.apache.sqoop.common.MutableContext; import org.apache.sqoop.common.MutableMapContext; import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration; -import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration; +//import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration; import org.apache.sqoop.job.Constants; import org.apache.sqoop.job.etl.Initializer; import org.apache.sqoop.job.etl.InitializerContext; http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java index 5b574c8..b48931c 100644 --- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java +++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java @@ -31,7 +31,7 @@ import org.apache.sqoop.common.MutableContext; import org.apache.sqoop.common.MutableMapContext; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration; -import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration; +//import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration; import org.apache.sqoop.job.Constants; import org.apache.sqoop.job.etl.Partition; import org.apache.sqoop.job.etl.Partitioner; http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/connector/connector-mysql-jdbc/src/main/java/org/apache/sqoop/connector/mysqljdbc/MySqlJdbcConnector.java ---------------------------------------------------------------------- diff --git a/connector/connector-mysql-jdbc/src/main/java/org/apache/sqoop/connector/mysqljdbc/MySqlJdbcConnector.java b/connector/connector-mysql-jdbc/src/main/java/org/apache/sqoop/connector/mysqljdbc/MySqlJdbcConnector.java index 17215f0..346b625 100644 --- a/connector/connector-mysql-jdbc/src/main/java/org/apache/sqoop/connector/mysqljdbc/MySqlJdbcConnector.java +++ b/connector/connector-mysql-jdbc/src/main/java/org/apache/sqoop/connector/mysqljdbc/MySqlJdbcConnector.java @@ -23,8 +23,8 @@ import java.util.List; import java.util.Locale; import java.util.ResourceBundle; -import org.apache.sqoop.job.etl.Exporter; -import org.apache.sqoop.job.etl.Importer; +import org.apache.sqoop.job.etl.From; +import org.apache.sqoop.job.etl.To; import org.apache.sqoop.model.MConnectionForms; import org.apache.sqoop.model.MForm; import org.apache.sqoop.connector.spi.SqoopConnector; @@ -53,13 +53,13 @@ public class MySqlJdbcConnector implements SqoopConnector { } @Override - public Importer getImporter() { + public From getImporter() { // TODO Auto-generated method stub return null; } @Override - public Exporter getExporter() { + public To getExporter() { // TODO Auto-generated method stub return null; } http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/core/src/main/java/org/apache/sqoop/connector/ConnectorHandler.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/connector/ConnectorHandler.java b/core/src/main/java/org/apache/sqoop/connector/ConnectorHandler.java index b80de7f..ca4b253 100644 --- a/core/src/main/java/org/apache/sqoop/connector/ConnectorHandler.java +++ b/core/src/main/java/org/apache/sqoop/connector/ConnectorHandler.java @@ -19,18 +19,16 @@ package org.apache.sqoop.connector; import java.io.IOException; import java.net.URL; -import java.util.LinkedList; -import java.util.List; import java.util.Properties; import org.apache.log4j.Logger; +import org.apache.sqoop.common.ConnectorType; import org.apache.sqoop.core.ConfigurationConstants; import org.apache.sqoop.model.FormUtils; import org.apache.sqoop.model.MConnectionForms; import org.apache.sqoop.model.MConnector; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.spi.SqoopConnector; -import org.apache.sqoop.model.MJob; import org.apache.sqoop.model.MJobForms; public final class ConnectorHandler { @@ -93,21 +91,19 @@ public final class ConnectorHandler { } // Initialize Metadata - List<MJobForms> jobForms = new LinkedList<MJobForms>(); - for(MJob.Type type : MJob.Type.values()) { - Class klass = connector.getJobConfigurationClass(type); - if(klass != null) { - jobForms.add(new MJobForms(type, FormUtils.toForms(klass))); - } - } - + MJobForms fromJobForms = new MJobForms(FormUtils.toForms( + connector.getJobConfigurationClass(ConnectorType.FROM))); MConnectionForms connectionForms = new MConnectionForms( FormUtils.toForms(connector.getConnectionConfigurationClass())); + MJobForms toJobForms = new MJobForms(FormUtils.toForms( + connector.getJobConfigurationClass(ConnectorType.TO))); + MConnectionForms toConnectionForms = new MConnectionForms( + FormUtils.toForms(connector.getConnectionConfigurationClass())); String connectorVersion = connector.getVersion(); - mConnector = new MConnector(connectorUniqueName, connectorClassName, - connectorVersion, connectionForms, jobForms); + mConnector = new MConnector(connectorUniqueName, connectorClassName, connectorVersion, + connectionForms, fromJobForms, toJobForms); if (LOG.isInfoEnabled()) { LOG.info("Connector [" + connectorClassName + "] initialized."); http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java b/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java index f43942d..96ec148 100644 --- a/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java +++ b/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java @@ -52,15 +52,9 @@ public abstract class ExecutionEngine { } /** - * Prepare given submission request for import job type. + * Prepare given submission request. * * @param request Submission request */ - public abstract void prepareImportSubmission(SubmissionRequest request); - - /** - * Prepare given submission request for export job type.. - * @param request - */ - public abstract void prepareExportSubmission(SubmissionRequest request); + public abstract void prepareSubmission(SubmissionRequest request); } http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java index 505121c..81e1147 100644 --- a/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java +++ b/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java @@ -24,14 +24,11 @@ import org.apache.sqoop.core.Reconfigurable; import org.apache.sqoop.core.SqoopConfiguration; import org.apache.sqoop.core.SqoopConfiguration.CoreConfigurationListener; import org.apache.sqoop.framework.configuration.ConnectionConfiguration; -import org.apache.sqoop.framework.configuration.ExportJobConfiguration; -import org.apache.sqoop.framework.configuration.ImportJobConfiguration; +import org.apache.sqoop.framework.configuration.JobConfiguration; import org.apache.sqoop.model.*; import org.apache.sqoop.repository.RepositoryManager; import org.apache.sqoop.validation.Validator; -import java.util.LinkedList; -import java.util.List; import java.util.Locale; import java.util.ResourceBundle; @@ -113,31 +110,20 @@ public class FrameworkManager implements Reconfigurable { public static final String CURRENT_FRAMEWORK_VERSION = "1"; - public Class getJobConfigurationClass(MJob.Type jobType) { - switch (jobType) { - case IMPORT: - return ImportJobConfiguration.class; - case EXPORT: - return ExportJobConfiguration.class; - default: - return null; - } + public Class getJobConfigurationClass() { + return JobConfiguration.class; + } + + public Class getConnectionConfigurationClass() { + return ConnectionConfiguration.class; } - public Class getConnectionConfigurationClass() { - return ConnectionConfiguration.class; - } public FrameworkManager() { MConnectionForms connectionForms = new MConnectionForms( FormUtils.toForms(getConnectionConfigurationClass()) ); - List<MJobForms> jobForms = new LinkedList<MJobForms>(); - jobForms.add(new MJobForms(MJob.Type.IMPORT, - FormUtils.toForms(getJobConfigurationClass(MJob.Type.IMPORT)))); - jobForms.add(new MJobForms(MJob.Type.EXPORT, - FormUtils.toForms(getJobConfigurationClass(MJob.Type.EXPORT)))); - mFramework = new MFramework(connectionForms, jobForms, - CURRENT_FRAMEWORK_VERSION); + mFramework = new MFramework(connectionForms, new MJobForms(FormUtils.toForms(getJobConfigurationClass())), + CURRENT_FRAMEWORK_VERSION); // Build validator validator = new FrameworkValidator(); http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/core/src/main/java/org/apache/sqoop/framework/FrameworkValidator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkValidator.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkValidator.java index f5f6a36..f19a23e 100644 --- a/core/src/main/java/org/apache/sqoop/framework/FrameworkValidator.java +++ b/core/src/main/java/org/apache/sqoop/framework/FrameworkValidator.java @@ -18,13 +18,11 @@ package org.apache.sqoop.framework; import org.apache.sqoop.framework.configuration.ConnectionConfiguration; -import org.apache.sqoop.framework.configuration.ExportJobConfiguration; -import org.apache.sqoop.framework.configuration.ImportJobConfiguration; import org.apache.sqoop.framework.configuration.InputForm; +import org.apache.sqoop.framework.configuration.JobConfiguration; import org.apache.sqoop.framework.configuration.OutputCompression; import org.apache.sqoop.framework.configuration.OutputForm; import org.apache.sqoop.framework.configuration.ThrottlingForm; -import org.apache.sqoop.model.MJob; import org.apache.sqoop.validation.Status; import org.apache.sqoop.validation.Validation; import org.apache.sqoop.validation.Validator; @@ -43,61 +41,57 @@ public class FrameworkValidator extends Validator { @Override - public Validation validateJob(MJob.Type type, Object jobConfiguration) { - switch(type) { - case IMPORT: - return validateImportJob(jobConfiguration); - case EXPORT: - return validateExportJob(jobConfiguration); - default: - return super.validateJob(type, jobConfiguration); - } - } - - private Validation validateExportJob(Object jobConfiguration) { - Validation validation = new Validation(ExportJobConfiguration.class); - ExportJobConfiguration configuration = (ExportJobConfiguration)jobConfiguration; - - validateInputForm(validation, configuration.input); - validateThrottingForm(validation, configuration.throttling); - - return validation; - } - - private Validation validateImportJob(Object jobConfiguration) { - Validation validation = new Validation(ImportJobConfiguration.class); - ImportJobConfiguration configuration = (ImportJobConfiguration)jobConfiguration; - - validateOutputForm(validation, configuration.output); + public Validation validateJob(Object jobConfiguration) { + JobConfiguration configuration = (JobConfiguration)jobConfiguration; + Validation validation = new Validation(JobConfiguration.class); validateThrottingForm(validation, configuration.throttling); - - return validation; + return super.validateJob(jobConfiguration); } - private void validateInputForm(Validation validation, InputForm input) { - if(input.inputDirectory == null || input.inputDirectory.isEmpty()) { - validation.addMessage(Status.UNACCEPTABLE, "input", "inputDirectory", "Input directory is empty"); - } - } +// private Validation validateExportJob(Object jobConfiguration) { +// Validation validation = new Validation(ExportJobConfiguration.class); +// ExportJobConfiguration configuration = (ExportJobConfiguration)jobConfiguration; +// +// validateInputForm(validation, configuration.input); +// validateThrottingForm(validation, configuration.throttling); +// +// return validation; +// } +// +// private Validation validateImportJob(Object jobConfiguration) { +// Validation validation = new Validation(ImportJobConfiguration.class); +// ImportJobConfiguration configuration = (ImportJobConfiguration)jobConfiguration; +// +// validateOutputForm(validation, configuration.output); +// validateThrottingForm(validation, configuration.throttling); +// +// return validation; +// } - private void validateOutputForm(Validation validation, OutputForm output) { - if(output.outputDirectory == null || output.outputDirectory.isEmpty()) { - validation.addMessage(Status.UNACCEPTABLE, "output", "outputDirectory", "Output directory is empty"); - } - if(output.customCompression != null && - output.customCompression.trim().length() > 0 && - output.compression != OutputCompression.CUSTOM) { - validation.addMessage(Status.UNACCEPTABLE, "output", "compression", - "custom compression should be blank as " + output.compression + " is being used."); - } - if(output.compression == OutputCompression.CUSTOM && - (output.customCompression == null || - output.customCompression.trim().length() == 0) - ) { - validation.addMessage(Status.UNACCEPTABLE, "output", "compression", - "custom compression is blank."); - } - } +// private void validateInputForm(Validation validation, InputForm input) { +// if(input.inputDirectory == null || input.inputDirectory.isEmpty()) { +// validation.addMessage(Status.UNACCEPTABLE, "input", "inputDirectory", "Input directory is empty"); +// } +// } +// +// private void validateOutputForm(Validation validation, OutputForm output) { +// if(output.outputDirectory == null || output.outputDirectory.isEmpty()) { +// validation.addMessage(Status.UNACCEPTABLE, "output", "outputDirectory", "Output directory is empty"); +// } +// if(output.customCompression != null && +// output.customCompression.trim().length() > 0 && +// output.compression != OutputCompression.CUSTOM) { +// validation.addMessage(Status.UNACCEPTABLE, "output", "compression", +// "custom compression should be blank as " + output.compression + " is being used."); +// } +// if(output.compression == OutputCompression.CUSTOM && +// (output.customCompression == null || +// output.customCompression.trim().length() == 0) +// ) { +// validation.addMessage(Status.UNACCEPTABLE, "output", "compression", +// "custom compression is blank."); +// } +// } private void validateThrottingForm(Validation validation, ThrottlingForm throttling) { if(throttling.extractors != null && throttling.extractors < 1) { http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/core/src/main/java/org/apache/sqoop/framework/JobManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/framework/JobManager.java b/core/src/main/java/org/apache/sqoop/framework/JobManager.java index 1700432..e0bf011 100644 --- a/core/src/main/java/org/apache/sqoop/framework/JobManager.java +++ b/core/src/main/java/org/apache/sqoop/framework/JobManager.java @@ -18,17 +18,17 @@ package org.apache.sqoop.framework; import org.apache.log4j.Logger; +import org.apache.sqoop.common.ConnectorType; import org.apache.sqoop.common.MapContext; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.ConnectorManager; +import org.apache.sqoop.framework.configuration.JobConfiguration; import org.apache.sqoop.request.HttpEventContext; import org.apache.sqoop.connector.idf.IntermediateDataFormat; import org.apache.sqoop.connector.spi.SqoopConnector; import org.apache.sqoop.core.Reconfigurable; import org.apache.sqoop.core.SqoopConfiguration; import org.apache.sqoop.core.SqoopConfiguration.CoreConfigurationListener; -import org.apache.sqoop.framework.configuration.ExportJobConfiguration; -import org.apache.sqoop.framework.configuration.ImportJobConfiguration; import org.apache.sqoop.job.etl.*; import org.apache.sqoop.model.FormUtils; import org.apache.sqoop.model.MConnection; @@ -280,34 +280,52 @@ public class JobManager implements Reconfigurable { "Job id: " + job.getPersistenceId()); } - MConnection connection = repository.findConnection(job.getConnectionId()); + MConnection fromConnection = repository.findConnection(job.getConnectionId(ConnectorType.FROM)); + MConnection toConnection = repository.findConnection(job.getConnectionId(ConnectorType.TO)); - if (!connection.getEnabled()) { + if (!fromConnection.getEnabled()) { throw new SqoopException(FrameworkError.FRAMEWORK_0010, - "Connection id: " + connection.getPersistenceId()); + "Connection id: " + fromConnection.getPersistenceId()); } - SqoopConnector connector = - ConnectorManager.getInstance().getConnector(job.getConnectorId()); + SqoopConnector fromConnector = + ConnectorManager.getInstance().getConnector(job.getConnectorId(ConnectorType.FROM)); + SqoopConnector toConnector = + ConnectorManager.getInstance().getConnector(job.getConnectorId(ConnectorType.TO)); - // Transform forms to connector specific classes - Object connectorConnection = ClassUtils.instantiate( - connector.getConnectionConfigurationClass()); - FormUtils.fromForms(connection.getConnectorPart().getForms(), - connectorConnection); + // Transform forms to fromConnector specific classes + Object fromConnectorConnection = ClassUtils.instantiate( + fromConnector.getConnectionConfigurationClass()); + FormUtils.fromForms(fromConnection.getConnectorPart().getForms(), + fromConnectorConnection); - Object connectorJob = ClassUtils.instantiate( - connector.getJobConfigurationClass(job.getType())); - FormUtils.fromForms(job.getConnectorPart().getForms(), connectorJob); + Object fromJob = ClassUtils.instantiate( + fromConnector.getJobConfigurationClass(ConnectorType.FROM)); + FormUtils.fromForms( + job.getConnectorPart(ConnectorType.FROM).getForms(), fromJob); + + // Transform forms to toConnector specific classes + Object toConnectorConnection = ClassUtils.instantiate( + toConnector.getConnectionConfigurationClass()); + FormUtils.fromForms(toConnection.getConnectorPart().getForms(), + toConnectorConnection); + + Object toJob = ClassUtils.instantiate( + toConnector.getJobConfigurationClass(ConnectorType.TO)); + FormUtils.fromForms(job.getConnectorPart(ConnectorType.TO).getForms(), toJob); // Transform framework specific forms - Object frameworkConnection = ClassUtils.instantiate( + Object fromFrameworkConnection = ClassUtils.instantiate( FrameworkManager.getInstance().getConnectionConfigurationClass()); - FormUtils.fromForms(connection.getFrameworkPart().getForms(), - frameworkConnection); + Object toFrameworkConnection = ClassUtils.instantiate( + FrameworkManager.getInstance().getConnectionConfigurationClass()); + FormUtils.fromForms(fromConnection.getFrameworkPart().getForms(), + fromFrameworkConnection); + FormUtils.fromForms(toConnection.getFrameworkPart().getForms(), + toFrameworkConnection); Object frameworkJob = ClassUtils.instantiate( - FrameworkManager.getInstance().getJobConfigurationClass(job.getType())); + FrameworkManager.getInstance().getJobConfigurationClass()); FormUtils.fromForms(job.getFrameworkPart().getForms(), frameworkJob); // Create request object @@ -319,12 +337,16 @@ public class JobManager implements Reconfigurable { // Save important variables to the submission request request.setSummary(summary); - request.setConnector(connector); - request.setConfigConnectorConnection(connectorConnection); - request.setConfigConnectorJob(connectorJob); - request.setConfigFrameworkConnection(frameworkConnection); + request.setConnector(ConnectorType.FROM, fromConnector); + request.setConnector(ConnectorType.TO, toConnector); + request.setConnectorConnectionConfig(ConnectorType.FROM, fromConnectorConnection); + request.setConnectorConnectionConfig(ConnectorType.TO, toConnectorConnection); + request.setConnectorJobConfig(ConnectorType.FROM, fromJob); + request.setConnectorJobConfig(ConnectorType.TO, toJob); + // @TODO(Abe): Should we actually have 2 different Framework Connection config objects? + request.setFrameworkConnectionConfig(ConnectorType.FROM, fromFrameworkConnection); + request.setFrameworkConnectionConfig(ConnectorType.TO, toFrameworkConnection); request.setConfigFrameworkJob(frameworkJob); - request.setJobType(job.getType()); request.setJobName(job.getName()); request.setJobId(job.getPersistenceId()); request.setNotificationUrl(notificationBaseUrl + jobId); @@ -342,8 +364,9 @@ public class JobManager implements Reconfigurable { request.addJarForClass(SqoopConnector.class); // Execution engine jar request.addJarForClass(executionEngine.getClass()); - // Connector in use - request.addJarForClass(connector.getClass()); + // Connectors in use + request.addJarForClass(fromConnector.getClass()); + request.addJarForClass(toConnector.getClass()); // Extra libraries that Sqoop code requires request.addJarForClass(JSONValue.class); @@ -351,67 +374,94 @@ public class JobManager implements Reconfigurable { // The IDF is used in the ETL process. request.addJarForClass(dataFormatClass); - // Get connector callbacks - switch (job.getType()) { - case IMPORT: - request.setConnectorCallbacks(connector.getImporter()); - break; - case EXPORT: - request.setConnectorCallbacks(connector.getExporter()); - break; - default: - throw new SqoopException(FrameworkError.FRAMEWORK_0005, - "Unsupported job type " + job.getType().name()); - } - LOG.debug("Using callbacks: " + request.getConnectorCallbacks()); - // Initialize submission from connector perspective - CallbackBase baseCallbacks = request.getConnectorCallbacks(); + // Get callbacks + request.setFromCallback(fromConnector.getFrom()); + request.setToCallback(toConnector.getTo()); + LOG.debug("Using callbacks: " + request.getFromCallback() + ", " + request.getToCallback()); + + // Initialize submission from fromConnector perspective + CallbackBase[] baseCallbacks = { + request.getFromCallback(), + request.getToCallback() + }; - Class<? extends Initializer> initializerClass = baseCallbacks - .getInitializer(); - Initializer initializer = (Initializer) ClassUtils - .instantiate(initializerClass); + CallbackBase baseCallback; + Class<? extends Initializer> initializerClass; + Initializer initializer; + InitializerContext initializerContext; + + // Initialize From Connector callback. + baseCallback = request.getFromCallback(); + + initializerClass = baseCallback + .getInitializer(); + initializer = (Initializer) ClassUtils + .instantiate(initializerClass); if (initializer == null) { throw new SqoopException(FrameworkError.FRAMEWORK_0006, - "Can't create initializer instance: " + initializerClass.getName()); + "Can't create initializer instance: " + initializerClass.getName()); } // Initializer context - InitializerContext initializerContext = new InitializerContext( - request.getConnectorContext()); + initializerContext = new InitializerContext(request.getConnectorContext(ConnectorType.FROM)); - // Initialize submission from connector perspective + // Initialize submission from fromConnector perspective initializer.initialize(initializerContext, - request.getConfigConnectorConnection(), - request.getConfigConnectorJob()); + request.getConnectorConnectionConfig(ConnectorType.FROM), + request.getConnectorJobConfig(ConnectorType.FROM)); // Add job specific jars to request.addJars(initializer.getJars(initializerContext, - request.getConfigConnectorConnection(), - request.getConfigConnectorJob())); + request.getConnectorConnectionConfig(ConnectorType.FROM), + request.getConnectorJobConfig(ConnectorType.FROM))); + // @TODO(Abe): Alter behavior of Schema here. Need from Schema. // Retrieve and persist the schema request.getSummary().setConnectorSchema(initializer.getSchema( - initializerContext, - request.getConfigConnectorConnection(), - request.getConfigConnectorJob() - )); + initializerContext, + request.getConnectorConnectionConfig(ConnectorType.FROM), + request.getConnectorJobConfig(ConnectorType.FROM) + )); - // Bootstrap job from framework perspective - switch (job.getType()) { - case IMPORT: - prepareImportSubmission(request); - break; - case EXPORT: - prepareExportSubmission(request); - break; - default: - throw new SqoopException(FrameworkError.FRAMEWORK_0005, - "Unsupported job type " + job.getType().name()); + // Initialize To Connector callback. + baseCallback = request.getToCallback(); + + initializerClass = baseCallback + .getInitializer(); + initializer = (Initializer) ClassUtils + .instantiate(initializerClass); + + if (initializer == null) { + throw new SqoopException(FrameworkError.FRAMEWORK_0006, + "Can't create initializer instance: " + initializerClass.getName()); } + // Initializer context + initializerContext = new InitializerContext(request.getConnectorContext(ConnectorType.TO)); + + // Initialize submission from fromConnector perspective + initializer.initialize(initializerContext, + request.getConnectorConnectionConfig(ConnectorType.TO), + request.getConnectorJobConfig(ConnectorType.TO)); + + // Add job specific jars to + request.addJars(initializer.getJars(initializerContext, + request.getConnectorConnectionConfig(ConnectorType.TO), + request.getConnectorJobConfig(ConnectorType.TO))); + + // @TODO(Abe): Alter behavior of Schema here. Need To Schema. + // Retrieve and persist the schema +// request.getSummary().setConnectorSchema(initializer.getSchema( +// initializerContext, +// request.getConnectorConnectionConfig(ConnectorType.TO), +// request.getConnectorJobConfig(ConnectorType.TO) +// )); + + // Bootstrap job from framework perspective + prepareSubmission(request); + // Make sure that this job id is not currently running and submit the job // only if it's not. synchronized (getClass()) { @@ -421,6 +471,7 @@ public class JobManager implements Reconfigurable { "Job with id " + jobId); } + // @TODO(Abe): Call multiple destroyers. // TODO(jarcec): We might need to catch all exceptions here to ensure // that Destroyer will be executed in all cases. boolean submitted = submissionEngine.submit(request); @@ -436,12 +487,9 @@ public class JobManager implements Reconfigurable { return summary; } - private void prepareImportSubmission(SubmissionRequest request) { - ImportJobConfiguration jobConfiguration = (ImportJobConfiguration) request - .getConfigFrameworkJob(); - - // Initialize the map-reduce part (all sort of required classes, ...) - request.setOutputDirectory(jobConfiguration.output.outputDirectory); + private void prepareSubmission(SubmissionRequest request) { + JobConfiguration jobConfiguration = (JobConfiguration) request + .getConfigFrameworkJob(); // We're directly moving configured number of extractors and loaders to // underlying request object. In the future we might need to throttle this @@ -450,21 +498,7 @@ public class JobManager implements Reconfigurable { request.setLoaders(jobConfiguration.throttling.loaders); // Delegate rest of the job to execution engine - executionEngine.prepareImportSubmission(request); - } - - private void prepareExportSubmission(SubmissionRequest request) { - ExportJobConfiguration jobConfiguration = (ExportJobConfiguration) request - .getConfigFrameworkJob(); - - // We're directly moving configured number of extractors and loaders to - // underlying request object. In the future we might need to throttle this - // count based on other running jobs to meet our SLAs. - request.setExtractors(jobConfiguration.throttling.extractors); - request.setLoaders(jobConfiguration.throttling.loaders); - - // Delegate rest of the job to execution engine - executionEngine.prepareExportSubmission(request); + executionEngine.prepareSubmission(request); } /** @@ -472,23 +506,37 @@ public class JobManager implements Reconfigurable { * remote cluster. */ private void destroySubmission(SubmissionRequest request) { - CallbackBase baseCallbacks = request.getConnectorCallbacks(); + CallbackBase fromCallback = request.getFromCallback(); + CallbackBase toCallback = request.getToCallback(); - Class<? extends Destroyer> destroyerClass = baseCallbacks.getDestroyer(); - Destroyer destroyer = (Destroyer) ClassUtils.instantiate(destroyerClass); + Class<? extends Destroyer> fromDestroyerClass = fromCallback.getDestroyer(); + Class<? extends Destroyer> toDestroyerClass = toCallback.getDestroyer(); + Destroyer fromDestroyer = (Destroyer) ClassUtils.instantiate(fromDestroyerClass); + Destroyer toDestroyer = (Destroyer) ClassUtils.instantiate(toDestroyerClass); - if (destroyer == null) { + if (fromDestroyer == null) { throw new SqoopException(FrameworkError.FRAMEWORK_0006, - "Can't create destroyer instance: " + destroyerClass.getName()); + "Can't create toDestroyer instance: " + fromDestroyerClass.getName()); } - DestroyerContext destroyerContext = new DestroyerContext( - request.getConnectorContext(), false, request.getSummary() + if (toDestroyer == null) { + throw new SqoopException(FrameworkError.FRAMEWORK_0006, + "Can't create toDestroyer instance: " + toDestroyerClass.getName()); + } + + // @TODO(Abe): Update context to manage multiple connectors. As well as summary. + DestroyerContext fromDestroyerContext = new DestroyerContext( + request.getConnectorContext(ConnectorType.FROM), false, request.getSummary() + .getConnectorSchema()); + DestroyerContext toDestroyerContext = new DestroyerContext( + request.getConnectorContext(ConnectorType.TO), false, request.getSummary() .getConnectorSchema()); // Initialize submission from connector perspective - destroyer.destroy(destroyerContext, request.getConfigConnectorConnection(), - request.getConfigConnectorJob()); + fromDestroyer.destroy(fromDestroyerContext, request.getConnectorConnectionConfig(ConnectorType.FROM), + request.getConnectorJobConfig(ConnectorType.FROM)); + toDestroyer.destroy(toDestroyerContext, request.getConnectorConnectionConfig(ConnectorType.TO), + request.getConnectorJobConfig(ConnectorType.TO)); } public MSubmission stop(long jobId, HttpEventContext ctx) { http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java b/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java index 7900eee..1645036 100644 --- a/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java +++ b/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java @@ -17,16 +17,18 @@ */ package org.apache.sqoop.framework; +import org.apache.sqoop.common.ConnectorType; import org.apache.sqoop.common.MutableMapContext; import org.apache.sqoop.connector.idf.IntermediateDataFormat; import org.apache.sqoop.connector.spi.SqoopConnector; import org.apache.sqoop.job.etl.CallbackBase; -import org.apache.sqoop.model.MJob; import org.apache.sqoop.model.MSubmission; import org.apache.sqoop.utils.ClassUtils; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; +import java.util.Map; /** * Submission details class is used when creating new submission and contains @@ -51,14 +53,9 @@ public class SubmissionRequest { long jobId; /** - * Job type - */ - MJob.Type jobType; - - /** * Connector instance associated with this submission request */ - SqoopConnector connector; + Map<ConnectorType, SqoopConnector > connectors; /** * List of required local jars for the job @@ -66,22 +63,27 @@ public class SubmissionRequest { List<String> jars; /** - * Base callbacks that are independent on job type + * From connector callback + */ + CallbackBase fromCallback; + + /** + * To connector callback */ - CallbackBase connectorCallbacks; + CallbackBase toCallback; /** - * All 4 configuration objects + * All configuration objects */ - Object configConnectorConnection; - Object configConnectorJob; - Object configFrameworkConnection; + Map<ConnectorType, Object> connectorConnectionConfigs; + Map<ConnectorType, Object> connectorJobConfigs; + Map<ConnectorType, Object> frameworkConnectionConfigs; Object configFrameworkJob; /** * Connector context (submission specific configuration) */ - MutableMapContext connectorContext; + Map<ConnectorType, MutableMapContext> connectorContexts; /** * Framework context (submission specific configuration) @@ -115,8 +117,17 @@ public class SubmissionRequest { public SubmissionRequest() { this.jars = new LinkedList<String>(); - this.connectorContext = new MutableMapContext(); + this.connectorContexts = new HashMap<ConnectorType, MutableMapContext>(); + + this.connectorContexts.put(ConnectorType.FROM, new MutableMapContext()); + this.connectorContexts.put(ConnectorType.TO, new MutableMapContext()); this.frameworkContext = new MutableMapContext(); + + this.connectorConnectionConfigs = new HashMap<ConnectorType, Object>(); + this.connectorJobConfigs = new HashMap<ConnectorType, Object>(); + this.frameworkConnectionConfigs = new HashMap<ConnectorType, Object>(); + + this.connectors = new HashMap<ConnectorType, SqoopConnector>(); } public MSubmission getSummary() { @@ -143,20 +154,12 @@ public class SubmissionRequest { this.jobId = jobId; } - public MJob.Type getJobType() { - return jobType; - } - - public void setJobType(MJob.Type jobType) { - this.jobType = jobType; + public SqoopConnector getConnector(ConnectorType type) { + return connectors.get(type); } - public SqoopConnector getConnector() { - return connector; - } - - public void setConnector(SqoopConnector connector) { - this.connector = connector; + public void setConnector(ConnectorType type, SqoopConnector connector) { + this.connectors.put(type, connector); } public List<String> getJars() { @@ -179,36 +182,44 @@ public class SubmissionRequest { } } - public CallbackBase getConnectorCallbacks() { - return connectorCallbacks; + public CallbackBase getFromCallback() { + return fromCallback; + } + + public void setFromCallback(CallbackBase fromCallback) { + this.fromCallback = fromCallback; + } + + public CallbackBase getToCallback() { + return toCallback; } - public void setConnectorCallbacks(CallbackBase connectorCallbacks) { - this.connectorCallbacks = connectorCallbacks; + public void setToCallback(CallbackBase toCallback) { + this.toCallback = toCallback; } - public Object getConfigConnectorConnection() { - return configConnectorConnection; + public Object getConnectorConnectionConfig(ConnectorType type) { + return connectorConnectionConfigs.get(type); } - public void setConfigConnectorConnection(Object config) { - configConnectorConnection = config; + public void setConnectorConnectionConfig(ConnectorType type, Object config) { + connectorConnectionConfigs.put(type, config); } - public Object getConfigConnectorJob() { - return configConnectorJob; + public Object getConnectorJobConfig(ConnectorType type) { + return connectorJobConfigs.get(type); } - public void setConfigConnectorJob(Object config) { - configConnectorJob = config; + public void setConnectorJobConfig(ConnectorType type, Object config) { + connectorJobConfigs.put(type, config); } - public Object getConfigFrameworkConnection() { - return configFrameworkConnection; + public Object getFrameworkConnectionConfig(ConnectorType type) { + return frameworkConnectionConfigs.get(type); } - public void setConfigFrameworkConnection(Object config) { - configFrameworkConnection = config; + public void setFrameworkConnectionConfig(ConnectorType type, Object config) { + frameworkConnectionConfigs.put(type, config); } public Object getConfigFrameworkJob() { @@ -219,22 +230,14 @@ public class SubmissionRequest { configFrameworkJob = config; } - public MutableMapContext getConnectorContext() { - return connectorContext; + public MutableMapContext getConnectorContext(ConnectorType type) { + return connectorContexts.get(type); } public MutableMapContext getFrameworkContext() { return frameworkContext; } - public String getOutputDirectory() { - return outputDirectory; - } - - public void setOutputDirectory(String outputDirectory) { - this.outputDirectory = outputDirectory; - } - public String getNotificationUrl() { return notificationUrl; } http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/core/src/main/java/org/apache/sqoop/framework/configuration/JobConfiguration.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/JobConfiguration.java b/core/src/main/java/org/apache/sqoop/framework/configuration/JobConfiguration.java new file mode 100644 index 0000000..7c653bf --- /dev/null +++ b/core/src/main/java/org/apache/sqoop/framework/configuration/JobConfiguration.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.framework.configuration; + +import org.apache.sqoop.model.ConfigurationClass; +import org.apache.sqoop.model.Form; + +@ConfigurationClass +public class JobConfiguration { + + @Form public ThrottlingForm throttling; + + public JobConfiguration() { + throttling = new ThrottlingForm(); + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/core/src/main/java/org/apache/sqoop/repository/Repository.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/repository/Repository.java b/core/src/main/java/org/apache/sqoop/repository/Repository.java index ecf5004..5087a39 100644 --- a/core/src/main/java/org/apache/sqoop/repository/Repository.java +++ b/core/src/main/java/org/apache/sqoop/repository/Repository.java @@ -18,6 +18,7 @@ package org.apache.sqoop.repository; import org.apache.log4j.Logger; +import org.apache.sqoop.common.ConnectorType; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.ConnectorManager; import org.apache.sqoop.connector.spi.MetadataUpgrader; @@ -37,7 +38,6 @@ import org.apache.sqoop.utils.ClassUtils; import org.apache.sqoop.validation.Validation; import org.apache.sqoop.validation.Validator; -import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Map; @@ -446,16 +446,18 @@ public abstract class Repository { // Make a new copy of the forms from the connector, // else the values will get set in the forms in the connector for // each connection. - List<MForm> forms = newConnector.getJobForms(job.getType()).clone(false).getForms(); - MJobForms newJobForms = new MJobForms(job.getType(), forms); - upgrader.upgrade(job.getConnectorPart(), newJobForms); - MJob newJob = new MJob(job, newJobForms, job.getFrameworkPart()); + List<MForm> forms = newConnector.getJobForms(ConnectorType.FROM).clone(false).getForms(); + MJobForms newJobForms = new MJobForms(forms); + upgrader.upgrade(job.getConnectorPart(ConnectorType.FROM), newJobForms); + // @TODO(Abe): Check From and To + MJob newJob = new MJob(job, newJobForms, job.getFrameworkPart(), newJobForms); // Transform form structures to objects for validations - Object newConfigurationObject = ClassUtils.instantiate(connector.getJobConfigurationClass(job.getType())); - FormUtils.fromForms(newJob.getConnectorPart().getForms(), newConfigurationObject); + // @TODO(Abe): Check From and To + Object newConfigurationObject = ClassUtils.instantiate(connector.getJobConfigurationClass(ConnectorType.FROM)); + FormUtils.fromForms(newJob.getConnectorPart(ConnectorType.FROM).getForms(), newConfigurationObject); - Validation validation = validator.validateJob(newJob.getType(), newConfigurationObject); + Validation validation = validator.validateJob(newConfigurationObject); if (validation.getStatus().canProceed()) { updateJob(newJob, tx); } else { @@ -509,6 +511,7 @@ public abstract class Repository { // Make a new copy of the forms from the connector, // else the values will get set in the forms in the connector for // each connection. + // @TODO(Abe): From/To connection forms. List<MForm> forms = framework.getConnectionForms().clone(false).getForms(); MConnectionForms newConnectionForms = new MConnectionForms(forms); upgrader.upgrade(connection.getFrameworkPart(), newConnectionForms); @@ -530,16 +533,16 @@ public abstract class Repository { // Make a new copy of the forms from the framework, // else the values will get set in the forms in the connector for // each connection. - List<MForm> forms = framework.getJobForms(job.getType()).clone(false).getForms(); - MJobForms newJobForms = new MJobForms(job.getType(), forms); + List<MForm> forms = framework.getJobForms().clone(false).getForms(); + MJobForms newJobForms = new MJobForms(forms); upgrader.upgrade(job.getFrameworkPart(), newJobForms); - MJob newJob = new MJob(job, job.getConnectorPart(), newJobForms); + MJob newJob = new MJob(job, job.getConnectorPart(ConnectorType.FROM), newJobForms, job.getConnectorPart(ConnectorType.TO)); // Transform form structures to objects for validations - Object newConfigurationObject = ClassUtils.instantiate(FrameworkManager.getInstance().getJobConfigurationClass(job.getType())); + Object newConfigurationObject = ClassUtils.instantiate(FrameworkManager.getInstance().getJobConfigurationClass()); FormUtils.fromForms(newJob.getFrameworkPart().getForms(), newConfigurationObject); - Validation validation = validator.validateJob(newJob.getType(), newConfigurationObject); + Validation validation = validator.validateJob(newConfigurationObject); if (validation.getStatus().canProceed()) { updateJob(newJob, tx); } else { http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java index 84f6213..82b195a 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java @@ -20,22 +20,14 @@ package org.apache.sqoop.execution.mapreduce; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.io.NullWritable; import org.apache.sqoop.common.MutableMapContext; -import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.framework.ExecutionEngine; import org.apache.sqoop.framework.SubmissionRequest; -import org.apache.sqoop.framework.configuration.ExportJobConfiguration; -import org.apache.sqoop.framework.configuration.ImportJobConfiguration; -import org.apache.sqoop.framework.configuration.OutputFormat; +import org.apache.sqoop.framework.configuration.JobConfiguration; import org.apache.sqoop.job.JobConstants; -import org.apache.sqoop.job.MapreduceExecutionError; -import org.apache.sqoop.job.etl.Exporter; -import org.apache.sqoop.job.etl.HdfsExportExtractor; -import org.apache.sqoop.job.etl.HdfsExportPartitioner; -import org.apache.sqoop.job.etl.HdfsSequenceImportLoader; -import org.apache.sqoop.job.etl.HdfsTextImportLoader; -import org.apache.sqoop.job.etl.Importer; +import org.apache.sqoop.job.etl.From; +import org.apache.sqoop.job.etl.To; +import org.apache.sqoop.job.io.Data; import org.apache.sqoop.job.io.SqoopWritable; -import org.apache.sqoop.job.mr.SqoopFileOutputFormat; import org.apache.sqoop.job.mr.SqoopInputFormat; import org.apache.sqoop.job.mr.SqoopMapper; import org.apache.sqoop.job.mr.SqoopNullOutputFormat; @@ -69,99 +61,66 @@ public class MapreduceExecutionEngine extends ExecutionEngine { request.setOutputValueClass(NullWritable.class); // Set up framework context + From from = (From)request.getFromCallback(); + To to = (To)request.getToCallback(); MutableMapContext context = request.getFrameworkContext(); + context.setString(JobConstants.JOB_ETL_PARTITIONER, from.getPartitioner().getName()); + context.setString(JobConstants.JOB_ETL_EXTRACTOR, from.getExtractor().getName()); + context.setString(JobConstants.JOB_ETL_LOADER, to.getLoader().getName()); + context.setString(JobConstants.JOB_ETL_DESTROYER, from.getDestroyer().getName()); context.setString(JobConstants.INTERMEDIATE_DATA_FORMAT, - request.getIntermediateDataFormat().getName()); + request.getIntermediateDataFormat().getName()); if(request.getExtractors() != null) { context.setInteger(JobConstants.JOB_ETL_EXTRACTOR_NUM, request.getExtractors()); } - } - - /** - * {@inheritDoc} - */ - @Override - public void prepareImportSubmission(SubmissionRequest gRequest) { - MRSubmissionRequest request = (MRSubmissionRequest) gRequest; - - prepareSubmission(request); - request.setOutputFormatClass(SqoopFileOutputFormat.class); - ImportJobConfiguration jobConf = (ImportJobConfiguration) request.getConfigFrameworkJob(); - - Importer importer = (Importer)request.getConnectorCallbacks(); - - // Set up framework context - MutableMapContext context = request.getFrameworkContext(); - context.setString(JobConstants.JOB_ETL_PARTITIONER, importer.getPartitioner().getName()); - context.setString(JobConstants.JOB_ETL_EXTRACTOR, importer.getExtractor().getName()); - context.setString(JobConstants.JOB_ETL_DESTROYER, importer.getDestroyer().getName()); - - // TODO: This settings should be abstracted to core module at some point - if(jobConf.output.outputFormat == OutputFormat.TEXT_FILE) { - context.setString(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName()); - } else if(jobConf.output.outputFormat == OutputFormat.SEQUENCE_FILE) { - context.setString(JobConstants.JOB_ETL_LOADER, HdfsSequenceImportLoader.class.getName()); - } else { - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0024, - "Format: " + jobConf.output.outputFormat); - } - if(getCompressionCodecName(jobConf) != null) { - context.setString(JobConstants.HADOOP_COMPRESS_CODEC, - getCompressionCodecName(jobConf)); - context.setBoolean(JobConstants.HADOOP_COMPRESS, true); + if(request.getExtractors() != null) { + context.setInteger(JobConstants.JOB_ETL_EXTRACTOR_NUM, request.getExtractors()); } - } - private String getCompressionCodecName(ImportJobConfiguration jobConf) { - if(jobConf.output.compression == null) - return null; - switch(jobConf.output.compression) { - case NONE: - return null; - case DEFAULT: - return "org.apache.hadoop.io.compress.DefaultCodec"; - case DEFLATE: - return "org.apache.hadoop.io.compress.DeflateCodec"; - case GZIP: - return "org.apache.hadoop.io.compress.GzipCodec"; - case BZIP2: - return "org.apache.hadoop.io.compress.BZip2Codec"; - case LZO: - return "com.hadoop.compression.lzo.LzoCodec"; - case LZ4: - return "org.apache.hadoop.io.compress.Lz4Codec"; - case SNAPPY: - return "org.apache.hadoop.io.compress.SnappyCodec"; - case CUSTOM: - return jobConf.output.customCompression.trim(); - } - return null; + // @TODO(Abe): Move to HDFS connector. +// if(jobConf.output.outputFormat == OutputFormat.TEXT_FILE) { +// context.setString(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName()); +// } else if(jobConf.output.outputFormat == OutputFormat.SEQUENCE_FILE) { +// context.setString(JobConstants.JOB_ETL_LOADER, HdfsSequenceImportLoader.class.getName()); +// } else { +// throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0024, +// "Format: " + jobConf.output.outputFormat); +// } +// if(getCompressionCodecName(jobConf) != null) { +// context.setString(JobConstants.HADOOP_COMPRESS_CODEC, +// getCompressionCodecName(jobConf)); +// context.setBoolean(JobConstants.HADOOP_COMPRESS, true); +// } } - /** - * {@inheritDoc} - */ - @Override - public void prepareExportSubmission(SubmissionRequest gRequest) { - MRSubmissionRequest request = (MRSubmissionRequest) gRequest; - ExportJobConfiguration jobConf = (ExportJobConfiguration) request.getConfigFrameworkJob(); - - prepareSubmission(request); - - Exporter exporter = (Exporter)request.getConnectorCallbacks(); - - // Set up framework context - MutableMapContext context = request.getFrameworkContext(); - context.setString(JobConstants.JOB_ETL_PARTITIONER, HdfsExportPartitioner.class.getName()); - context.setString(JobConstants.JOB_ETL_LOADER, exporter.getLoader().getName()); - context.setString(JobConstants.JOB_ETL_DESTROYER, exporter.getDestroyer().getName()); - - // Extractor that will be able to read all supported file types - context.setString(JobConstants.JOB_ETL_EXTRACTOR, HdfsExportExtractor.class.getName()); - context.setString(JobConstants.HADOOP_INPUTDIR, jobConf.input.inputDirectory); - } + // @TODO(Abe): Move to HDFS connector. +// private String getCompressionCodecName(ImportJobConfiguration jobConf) { +// if(jobConf.output.compression == null) +// return null; +// switch(jobConf.output.compression) { +// case NONE: +// return null; +// case DEFAULT: +// return "org.apache.hadoop.io.compress.DefaultCodec"; +// case DEFLATE: +// return "org.apache.hadoop.io.compress.DeflateCodec"; +// case GZIP: +// return "org.apache.hadoop.io.compress.GzipCodec"; +// case BZIP2: +// return "org.apache.hadoop.io.compress.BZip2Codec"; +// case LZO: +// return "com.hadoop.compression.lzo.LzoCodec"; +// case LZ4: +// return "org.apache.hadoop.io.compress.Lz4Codec"; +// case SNAPPY: +// return "org.apache.hadoop.io.compress.SnappyCodec"; +// case CUSTOM: +// return jobConf.output.customCompression.trim(); +// } +// return null; +// } /** * Our execution engine have additional dependencies that needs to be available http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java index b2fa15d..4cdb002 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java @@ -51,8 +51,11 @@ public final class JobConstants extends Constants { public static final String JOB_ETL_EXTRACTOR_NUM = PREFIX_JOB_CONFIG + "etl.extractor.count"; - public static final String PREFIX_CONNECTOR_CONTEXT = - PREFIX_JOB_CONFIG + "connector.context."; + public static final String PREFIX_CONNECTOR_FROM_CONTEXT = + PREFIX_JOB_CONFIG + "connector.from.context."; + + public static final String PREFIX_CONNECTOR_TO_CONTEXT = + PREFIX_JOB_CONFIG + "connector.to.context."; // Hadoop specific constants // We're using constants from Hadoop 1. Hadoop 2 has different names, but
