SQOOP-1376: Sqoop2: From/To: Refactor connector interface (Abraham Elmahrek via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/ba81ec7f Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/ba81ec7f Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/ba81ec7f Branch: refs/heads/SQOOP-1367 Commit: ba81ec7f890e8059ad90b3708b56084ed12e5244 Parents: c810826 Author: Jarek Jarcec Cecho <[email protected]> Authored: Fri Aug 8 13:11:35 2014 -0700 Committer: Abraham Elmahrek <[email protected]> Committed: Mon Aug 11 15:13:24 2014 -0700 ---------------------------------------------------------------------- .../org/apache/sqoop/client/SqoopClient.java | 44 +- .../sqoop/client/request/ConnectionRequest.java | 18 +- .../apache/sqoop/client/request/JobRequest.java | 10 +- .../sqoop/client/request/SqoopRequests.java | 14 +- .../org/apache/sqoop/common/ConnectorType.java | 30 + .../sqoop/json/ConnectionValidationBean.java | 143 +++++ .../org/apache/sqoop/json/ConnectorBean.java | 37 +- .../org/apache/sqoop/json/FrameworkBean.java | 29 +- .../java/org/apache/sqoop/json/JobBean.java | 51 +- .../apache/sqoop/json/JobValidationBean.java | 157 +++++ .../org/apache/sqoop/json/ValidationBean.java | 143 ----- .../java/org/apache/sqoop/model/MConnector.java | 72 ++- .../java/org/apache/sqoop/model/MFramework.java | 55 +- .../main/java/org/apache/sqoop/model/MJob.java | 131 ++-- .../java/org/apache/sqoop/model/MJobForms.java | 30 +- .../connector/jdbc/GenericJdbcConnector.java | 46 +- .../jdbc/GenericJdbcConnectorConstants.java | 6 +- .../jdbc/GenericJdbcExportDestroyer.java | 62 -- .../jdbc/GenericJdbcExportInitializer.java | 222 ------- .../connector/jdbc/GenericJdbcExportLoader.java | 76 --- .../connector/jdbc/GenericJdbcExtractor.java | 78 +++ .../jdbc/GenericJdbcFromDestroyer.java | 36 ++ .../jdbc/GenericJdbcFromInitializer.java | 322 ++++++++++ .../jdbc/GenericJdbcImportDestroyer.java | 36 -- .../jdbc/GenericJdbcImportExtractor.java | 78 --- .../jdbc/GenericJdbcImportInitializer.java | 322 ---------- .../jdbc/GenericJdbcImportPartition.java | 53 -- .../jdbc/GenericJdbcImportPartitioner.java | 605 ------------------- .../sqoop/connector/jdbc/GenericJdbcLoader.java | 76 +++ .../connector/jdbc/GenericJdbcPartition.java | 53 ++ .../connector/jdbc/GenericJdbcPartitioner.java | 604 ++++++++++++++++++ .../connector/jdbc/GenericJdbcToDestroyer.java | 62 ++ .../jdbc/GenericJdbcToInitializer.java | 222 +++++++ .../connector/jdbc/GenericJdbcValidator.java | 24 +- .../configuration/ExportJobConfiguration.java | 33 - .../jdbc/configuration/ExportTableForm.java | 34 -- .../configuration/FromJobConfiguration.java | 33 + .../jdbc/configuration/FromTableForm.java | 35 ++ .../configuration/ImportJobConfiguration.java | 33 - .../jdbc/configuration/ImportTableForm.java | 35 -- .../jdbc/configuration/ToJobConfiguration.java | 33 + .../jdbc/configuration/ToTableForm.java | 34 ++ .../connector/jdbc/TestExportInitializer.java | 2 +- .../sqoop/connector/jdbc/TestExportLoader.java | 2 +- .../connector/jdbc/TestImportExtractor.java | 2 +- .../connector/jdbc/TestImportInitializer.java | 2 +- .../connector/jdbc/TestImportPartitioner.java | 2 +- .../connector/mysqljdbc/MySqlJdbcConnector.java | 8 +- .../sqoop/connector/ConnectorHandler.java | 22 +- .../apache/sqoop/framework/ExecutionEngine.java | 10 +- .../sqoop/framework/FrameworkManager.java | 32 +- .../sqoop/framework/FrameworkValidator.java | 102 ++-- .../org/apache/sqoop/framework/JobManager.java | 244 +++++--- .../sqoop/framework/SubmissionRequest.java | 109 ++-- .../configuration/JobConfiguration.java | 31 + .../org/apache/sqoop/repository/Repository.java | 29 +- .../mapreduce/MapreduceExecutionEngine.java | 147 ++--- .../java/org/apache/sqoop/job/JobConstants.java | 7 +- .../sqoop/job/etl/HdfsExportExtractor.java | 302 ++++----- .../apache/sqoop/job/mr/ConfigurationUtils.java | 187 ++++-- .../sqoop/job/mr/SqoopDestroyerExecutor.java | 14 +- .../apache/sqoop/job/mr/SqoopInputFormat.java | 10 +- .../org/apache/sqoop/job/mr/SqoopMapper.java | 27 +- .../job/mr/SqoopOutputFormatLoadExecutor.java | 24 +- .../org/apache/sqoop/job/TestHdfsExtract.java | 2 +- .../derby/DerbyRepositoryHandler.java | 278 ++++++--- .../repository/derby/DerbySchemaConstants.java | 4 +- .../repository/derby/DerbySchemaQuery.java | 72 ++- .../sqoop/handler/ConnectionRequestHandler.java | 6 +- .../apache/sqoop/handler/JobRequestHandler.java | 63 +- .../apache/sqoop/shell/CloneJobFunction.java | 8 +- .../apache/sqoop/shell/CreateJobFunction.java | 37 +- .../sqoop/shell/DeleteConnectionFunction.java | 2 +- .../sqoop/shell/ShowConnectionFunction.java | 8 +- .../org/apache/sqoop/shell/ShowJobFunction.java | 27 +- .../apache/sqoop/shell/UpdateJobFunction.java | 8 +- .../org/apache/sqoop/shell/core/Constants.java | 22 +- .../apache/sqoop/shell/utils/FormDisplayer.java | 37 +- .../apache/sqoop/shell/utils/FormFiller.java | 56 +- .../shell/utils/JobDynamicFormOptions.java | 6 +- .../main/resources/shell-resource.properties | 9 +- .../sqoop/connector/spi/SqoopConnector.java | 16 +- .../java/org/apache/sqoop/job/etl/Exporter.java | 51 -- .../java/org/apache/sqoop/job/etl/From.java | 58 ++ .../java/org/apache/sqoop/job/etl/Importer.java | 58 -- .../main/java/org/apache/sqoop/job/etl/To.java | 51 ++ .../org/apache/sqoop/validation/Validator.java | 3 +- .../mapreduce/MapreduceSubmissionEngine.java | 34 +- 88 files changed, 3446 insertions(+), 3002 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/client/src/main/java/org/apache/sqoop/client/SqoopClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/sqoop/client/SqoopClient.java b/client/src/main/java/org/apache/sqoop/client/SqoopClient.java index 05ea6d6..b42f234 100644 --- a/client/src/main/java/org/apache/sqoop/client/SqoopClient.java +++ b/client/src/main/java/org/apache/sqoop/client/SqoopClient.java @@ -18,10 +18,12 @@ package org.apache.sqoop.client; import org.apache.sqoop.client.request.SqoopRequests; +import org.apache.sqoop.common.ConnectorType; import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.json.ConnectionValidationBean; import org.apache.sqoop.json.ConnectorBean; import org.apache.sqoop.json.FrameworkBean; -import org.apache.sqoop.json.ValidationBean; +import org.apache.sqoop.json.JobValidationBean; import org.apache.sqoop.model.FormUtils; import org.apache.sqoop.model.MConnection; import org.apache.sqoop.model.MConnector; @@ -351,21 +353,24 @@ public class SqoopClient { } /** - * Create new job of given type and for given connection. + * Create new job the for given connections. * - * @param xid Connection id - * @param type Job type + * @param fromXid From Connection id + * @param toXid To Connection id * @return */ - public MJob newJob(long xid, MJob.Type type) { - MConnection connection = getConnection(xid); + public MJob newJob(long fromXid, long toXid) { + MConnection fromConnection = getConnection(fromXid); + MConnection toConnection = getConnection(toXid); return new MJob( - connection.getConnectorId(), - connection.getPersistenceId(), - type, - getConnector(connection.getConnectorId()).getJobForms(type), - getFramework().getJobForms(type) + fromConnection.getConnectorId(), + toConnection.getConnectorId(), + fromConnection.getPersistenceId(), + toConnection.getPersistenceId(), + getConnector(fromConnection.getConnectorId()).getJobForms(ConnectorType.FROM), + getConnector(fromConnection.getConnectorId()).getJobForms(ConnectorType.TO), + getFramework().getJobForms() ); } @@ -529,7 +534,7 @@ public class SqoopClient { return requests.readHistory(jid).getSubmissions(); } - private Status applyValidations(ValidationBean bean, MConnection connection) { + private Status applyValidations(ConnectionValidationBean bean, MConnection connection) { Validation connector = bean.getConnectorValidation(); Validation framework = bean.getFrameworkValidation(); @@ -544,18 +549,25 @@ public class SqoopClient { return Status.getWorstStatus(connector.getStatus(), framework.getStatus()); } - private Status applyValidations(ValidationBean bean, MJob job) { - Validation connector = bean.getConnectorValidation(); + private Status applyValidations(JobValidationBean bean, MJob job) { + Validation fromConnector = bean.getConnectorValidation(ConnectorType.FROM); + Validation toConnector = bean.getConnectorValidation(ConnectorType.TO); Validation framework = bean.getFrameworkValidation(); - FormUtils.applyValidation(job.getConnectorPart().getForms(), connector); + // @TODO(Abe): From/To validation. + FormUtils.applyValidation( + job.getConnectorPart(ConnectorType.FROM).getForms(), + fromConnector); FormUtils.applyValidation(job.getFrameworkPart().getForms(), framework); + FormUtils.applyValidation( + job.getConnectorPart(ConnectorType.TO).getForms(), + toConnector); Long id = bean.getId(); if(id != null) { job.setPersistenceId(id); } - return Status.getWorstStatus(connector.getStatus(), framework.getStatus()); + return Status.getWorstStatus(fromConnector.getStatus(), framework.getStatus(), toConnector.getStatus()); } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/client/src/main/java/org/apache/sqoop/client/request/ConnectionRequest.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/sqoop/client/request/ConnectionRequest.java b/client/src/main/java/org/apache/sqoop/client/request/ConnectionRequest.java index f523abb..e0740a9 100644 --- a/client/src/main/java/org/apache/sqoop/client/request/ConnectionRequest.java +++ b/client/src/main/java/org/apache/sqoop/client/request/ConnectionRequest.java @@ -18,7 +18,7 @@ package org.apache.sqoop.client.request; import org.apache.sqoop.json.ConnectionBean; -import org.apache.sqoop.json.ValidationBean; +import org.apache.sqoop.json.ConnectionValidationBean; import org.apache.sqoop.model.MConnection; import org.json.simple.JSONObject; import org.json.simple.JSONValue; @@ -49,7 +49,7 @@ public class ConnectionRequest extends Request { return connectionBean; } - public ValidationBean create(String serverUrl, MConnection connection) { + public ConnectionValidationBean create(String serverUrl, MConnection connection) { ConnectionBean connectionBean = new ConnectionBean(connection); @@ -59,13 +59,13 @@ public class ConnectionRequest extends Request { String response = super.post(serverUrl + RESOURCE, connectionJson.toJSONString()); - ValidationBean validationBean = new ValidationBean(); - validationBean.restore((JSONObject) JSONValue.parse(response)); + ConnectionValidationBean connectionValidationBean = new ConnectionValidationBean(); + connectionValidationBean.restore((JSONObject) JSONValue.parse(response)); - return validationBean; + return connectionValidationBean; } - public ValidationBean update(String serverUrl, MConnection connection) { + public ConnectionValidationBean update(String serverUrl, MConnection connection) { ConnectionBean connectionBean = new ConnectionBean(connection); @@ -76,10 +76,10 @@ public class ConnectionRequest extends Request { + connection.getPersistenceId(), connectionJson.toJSONString()); - ValidationBean validationBean = new ValidationBean(); - validationBean.restore((JSONObject) JSONValue.parse(response)); + ConnectionValidationBean connectionValidationBean = new ConnectionValidationBean(); + connectionValidationBean.restore((JSONObject) JSONValue.parse(response)); - return validationBean; + return connectionValidationBean; } public void delete(String serverUrl, Long id) { http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/client/src/main/java/org/apache/sqoop/client/request/JobRequest.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/sqoop/client/request/JobRequest.java b/client/src/main/java/org/apache/sqoop/client/request/JobRequest.java index 6dee2c8..b824512 100644 --- a/client/src/main/java/org/apache/sqoop/client/request/JobRequest.java +++ b/client/src/main/java/org/apache/sqoop/client/request/JobRequest.java @@ -18,7 +18,7 @@ package org.apache.sqoop.client.request; import org.apache.sqoop.json.JobBean; -import org.apache.sqoop.json.ValidationBean; +import org.apache.sqoop.json.JobValidationBean; import org.apache.sqoop.model.MJob; import org.json.simple.JSONObject; import org.json.simple.JSONValue; @@ -49,7 +49,7 @@ public class JobRequest extends Request { return jobBean; } - public ValidationBean create(String serverUrl, MJob job) { + public JobValidationBean create(String serverUrl, MJob job) { JobBean jobBean = new JobBean(job); @@ -59,13 +59,13 @@ public class JobRequest extends Request { String response = super.post(serverUrl + RESOURCE, jobJson.toJSONString()); - ValidationBean validationBean = new ValidationBean(); + JobValidationBean validationBean = new JobValidationBean(); validationBean.restore((JSONObject) JSONValue.parse(response)); return validationBean; } - public ValidationBean update(String serverUrl, MJob job) { + public JobValidationBean update(String serverUrl, MJob job) { JobBean jobBean = new JobBean(job); @@ -75,7 +75,7 @@ public class JobRequest extends Request { String response = super.put(serverUrl + RESOURCE + job.getPersistenceId(), jobJson.toJSONString()); - ValidationBean validationBean = new ValidationBean(); + JobValidationBean validationBean = new JobValidationBean(); validationBean.restore((JSONObject) JSONValue.parse(response)); return validationBean; http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/client/src/main/java/org/apache/sqoop/client/request/SqoopRequests.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/sqoop/client/request/SqoopRequests.java b/client/src/main/java/org/apache/sqoop/client/request/SqoopRequests.java index ffaa84f..d87bb78 100644 --- a/client/src/main/java/org/apache/sqoop/client/request/SqoopRequests.java +++ b/client/src/main/java/org/apache/sqoop/client/request/SqoopRequests.java @@ -18,16 +18,14 @@ package org.apache.sqoop.client.request; import org.apache.sqoop.json.ConnectionBean; +import org.apache.sqoop.json.ConnectionValidationBean; import org.apache.sqoop.json.ConnectorBean; import org.apache.sqoop.json.FrameworkBean; import org.apache.sqoop.json.JobBean; +import org.apache.sqoop.json.JobValidationBean; import org.apache.sqoop.json.SubmissionBean; -import org.apache.sqoop.json.ValidationBean; -import org.apache.sqoop.model.FormUtils; import org.apache.sqoop.model.MConnection; import org.apache.sqoop.model.MJob; -import org.apache.sqoop.validation.Status; -import org.apache.sqoop.validation.Validation; /** * Unified class for all request objects. @@ -94,7 +92,7 @@ public class SqoopRequests { return getConnectorRequest().read(serverUrl, cid); } - public ValidationBean createConnection(MConnection connection) { + public ConnectionValidationBean createConnection(MConnection connection) { return getConnectionRequest().create(serverUrl, connection); } @@ -102,7 +100,7 @@ public class SqoopRequests { return getConnectionRequest().read(serverUrl, connectionId); } - public ValidationBean updateConnection(MConnection connection) { + public ConnectionValidationBean updateConnection(MConnection connection) { return getConnectionRequest().update(serverUrl, connection); } @@ -114,7 +112,7 @@ public class SqoopRequests { getConnectionRequest().delete(serverUrl, xid); } - public ValidationBean createJob(MJob job) { + public JobValidationBean createJob(MJob job) { return getJobRequest().create(serverUrl, job); } @@ -122,7 +120,7 @@ public class SqoopRequests { return getJobRequest().read(serverUrl, jobId); } - public ValidationBean updateJob(MJob job) { + public JobValidationBean updateJob(MJob job) { return getJobRequest().update(serverUrl, job); } http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/common/src/main/java/org/apache/sqoop/common/ConnectorType.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/sqoop/common/ConnectorType.java b/common/src/main/java/org/apache/sqoop/common/ConnectorType.java new file mode 100644 index 0000000..d3d1d19 --- /dev/null +++ b/common/src/main/java/org/apache/sqoop/common/ConnectorType.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.common; + +/** + * Connectors will have configurations for FROM and TO. + * If the connector is being used to extract data FROM, + * then the connector type will be FROM. If the connector + * is being used to load data TO, then the connector type + * will be TO. + */ +public enum ConnectorType { + FROM, + TO +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/common/src/main/java/org/apache/sqoop/json/ConnectionValidationBean.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/sqoop/json/ConnectionValidationBean.java b/common/src/main/java/org/apache/sqoop/json/ConnectionValidationBean.java new file mode 100644 index 0000000..ffdd13e --- /dev/null +++ b/common/src/main/java/org/apache/sqoop/json/ConnectionValidationBean.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.json; + +import org.apache.sqoop.validation.Status; +import org.apache.sqoop.validation.Validation; +import org.json.simple.JSONObject; + +import java.util.HashMap; +import java.util.Map; + +/** + * Bean for sending validations across network. This bean will move two + * validation objects at one time - one for connector and second for framework + * part of validated entity. Optionally validation bean can also transfer + * created persistent id in case that new entity was created. + */ +public class ConnectionValidationBean implements JsonBean { + + private static final String ID = "id"; + private static final String FRAMEWORK = "framework"; + private static final String CONNECTOR = "connector"; + private static final String STATUS = "status"; + private static final String MESSAGE = "message"; + private static final String MESSAGES = "messages"; + + private Long id; + private Validation connectorValidation; + private Validation frameworkValidation; + + // For "extract" + public ConnectionValidationBean(Validation connector, Validation framework) { + this(); + + this.connectorValidation = connector; + this.frameworkValidation = framework; + } + + // For "restore" + public ConnectionValidationBean() { + id = null; + } + + public Validation getConnectorValidation() { + return connectorValidation; + } + + public Validation getFrameworkValidation() { + return frameworkValidation; + } + + public void setId(Long id) { + this.id = id; + } + + public Long getId() { + return id; + } + + @SuppressWarnings("unchecked") + public JSONObject extract(boolean skipSensitive) { + JSONObject object = new JSONObject(); + + // Optionally transfer id + if(id != null) { + object.put(ID, id); + } + + object.put(CONNECTOR, extractValidation(connectorValidation)); + object.put(FRAMEWORK, extractValidation(frameworkValidation)); + + return object; + } + + @SuppressWarnings("unchecked") + private JSONObject extractValidation(Validation validation) { + JSONObject object = new JSONObject(); + + object.put(STATUS, validation.getStatus().name()); + + JSONObject jsonMessages = new JSONObject(); + Map<Validation.FormInput, Validation.Message> messages = validation.getMessages(); + + for(Map.Entry<Validation.FormInput, Validation.Message> entry : messages.entrySet()) { + JSONObject jsonEntry = new JSONObject(); + jsonEntry.put(STATUS, entry.getValue().getStatus().name()); + jsonEntry.put(MESSAGE, entry.getValue().getMessage()); + jsonMessages.put(entry.getKey(), jsonEntry); + } + + object.put(MESSAGES, jsonMessages); + + return object; + } + + @Override + public void restore(JSONObject jsonObject) { + // Optional and accepting NULLs + id = (Long) jsonObject.get(ID); + + connectorValidation = restoreValidation( + (JSONObject)jsonObject.get(CONNECTOR)); + frameworkValidation = restoreValidation( + (JSONObject)jsonObject.get(FRAMEWORK)); + } + + public Validation restoreValidation(JSONObject jsonObject) { + JSONObject jsonMessages = (JSONObject) jsonObject.get(MESSAGES); + Map<Validation.FormInput, Validation.Message> messages + = new HashMap<Validation.FormInput, Validation.Message>(); + + for(Object key : jsonMessages.keySet()) { + JSONObject jsonMessage = (JSONObject) jsonMessages.get(key); + + Status status = Status.valueOf((String) jsonMessage.get(STATUS)); + String stringMessage = (String) jsonMessage.get(MESSAGE); + + Validation.Message message + = new Validation.Message(status, stringMessage); + + messages.put(new Validation.FormInput((String)key), message); + } + + Status status = Status.valueOf((String) jsonObject.get(STATUS)); + + return new Validation(status, messages); + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/common/src/main/java/org/apache/sqoop/json/ConnectorBean.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/sqoop/json/ConnectorBean.java b/common/src/main/java/org/apache/sqoop/json/ConnectorBean.java index cbe049a..ed1de6e 100644 --- a/common/src/main/java/org/apache/sqoop/json/ConnectorBean.java +++ b/common/src/main/java/org/apache/sqoop/json/ConnectorBean.java @@ -24,8 +24,8 @@ import java.util.Map; import java.util.ResourceBundle; import java.util.Set; +import org.apache.sqoop.common.ConnectorType; import org.apache.sqoop.model.MConnectionForms; -import org.apache.sqoop.model.MJob; import org.apache.sqoop.model.MJobForms; import org.apache.sqoop.model.MConnector; import org.apache.sqoop.model.MForm; @@ -73,14 +73,13 @@ public class ConnectorBean implements JsonBean { object.put(NAME, connector.getUniqueName()); object.put(CLASS, connector.getClassName()); object.put(VERSION, connector.getVersion()); - object.put(CON_FORMS, extractForms(connector.getConnectionForms().getForms(), skipSensitive)); - - JSONObject jobForms = new JSONObject(); - for (MJobForms job : connector.getAllJobsForms().values()) { - jobForms.put(job.getType().name(), extractForms(job.getForms(), skipSensitive)); - } - object.put(JOB_FORMS, jobForms); + object.put(CON_FORMS, extractForms(connector.getConnectionForms().getForms(), skipSensitive)); + object.put(JOB_FORMS, new JSONObject()); + ((JSONObject)object.get(JOB_FORMS)).put( + ConnectorType.FROM, extractForms(connector.getJobForms(ConnectorType.FROM).getForms(), skipSensitive)); + ((JSONObject)object.get(JOB_FORMS)).put( + ConnectorType.TO, extractForms(connector.getJobForms(ConnectorType.TO).getForms(), skipSensitive)); array.add(object); } @@ -119,17 +118,17 @@ public class ConnectorBean implements JsonBean { List<MForm> connForms = restoreForms((JSONArray) object.get(CON_FORMS)); JSONObject jobJson = (JSONObject) object.get(JOB_FORMS); - List<MJobForms> jobs = new ArrayList<MJobForms>(); - for( Map.Entry entry : (Set<Map.Entry>) jobJson.entrySet()) { - MJob.Type type = MJob.Type.valueOf((String) entry.getKey()); - - List<MForm> jobForms = - restoreForms((JSONArray) jobJson.get(entry.getKey())); - - jobs.add(new MJobForms(type, jobForms)); - } - - MConnector connector = new MConnector(uniqueName, className, version, new MConnectionForms(connForms), jobs); + JSONArray fromJobJson = (JSONArray)jobJson.get(ConnectorType.FROM.name()); + JSONArray toJobJson = (JSONArray)jobJson.get(ConnectorType.TO.name()); + List<MForm> fromJobForms = + restoreForms(fromJobJson); + List<MForm> toJobForms = + restoreForms(toJobJson); + MJobForms fromJob = new MJobForms(fromJobForms); + MJobForms toJob = new MJobForms(toJobForms); + MConnectionForms connection = new MConnectionForms(connForms); + + MConnector connector = new MConnector(uniqueName, className, version, connection, fromJob, toJob); connector.setPersistenceId(connectorId); connectors.add(connector); http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/common/src/main/java/org/apache/sqoop/json/FrameworkBean.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/sqoop/json/FrameworkBean.java b/common/src/main/java/org/apache/sqoop/json/FrameworkBean.java index eb79f98..abbdcc6 100644 --- a/common/src/main/java/org/apache/sqoop/json/FrameworkBean.java +++ b/common/src/main/java/org/apache/sqoop/json/FrameworkBean.java @@ -18,6 +18,7 @@ package org.apache.sqoop.json; import org.apache.sqoop.model.MConnectionForms; +import org.apache.sqoop.model.MConnector; import org.apache.sqoop.model.MForm; import org.apache.sqoop.model.MFramework; import org.apache.sqoop.model.MJob; @@ -65,13 +66,10 @@ public class FrameworkBean implements JsonBean { @SuppressWarnings("unchecked") @Override public JSONObject extract(boolean skipSensitive) { + // @TODO(Abe): Add From/To connection forms. JSONArray conForms = extractForms(framework.getConnectionForms().getForms(), skipSensitive); - JSONObject jobForms = new JSONObject(); - - for (MJobForms job : framework.getAllJobsForms().values()) { - jobForms.put(job.getType().name(), extractForms(job.getForms(), skipSensitive)); - } + JSONArray jobForms = extractForms(framework.getJobForms().getForms(), skipSensitive); JSONObject result = new JSONObject(); result.put(ID, framework.getPersistenceId()); @@ -89,22 +87,13 @@ public class FrameworkBean implements JsonBean { String frameworkVersion = (String) jsonObject.get(FRAMEWORK_VERSION); List<MForm> connForms = restoreForms((JSONArray) jsonObject.get(CON_FORMS)); + List<MForm> jobForms = restoreForms((JSONArray) jsonObject.get(JOB_FORMS)); - JSONObject jobForms = (JSONObject) jsonObject.get(JOB_FORMS); - - List<MJobForms> jobs = new ArrayList<MJobForms>(); - for( Map.Entry entry : (Set<Map.Entry>) jobForms.entrySet()) { - //TODO(jarcec): Handle situation when server is supporting operation - // that client do not know (server do have newer version than client) - MJob.Type type = MJob.Type.valueOf((String) entry.getKey()); - - List<MForm> job = restoreForms((JSONArray) entry.getValue()); - - jobs.add(new MJobForms(type, job)); - } - - framework = new MFramework(new MConnectionForms(connForms), jobs, - frameworkVersion); + // @TODO(Abe): Get From/To connection forms. + framework = new MFramework( + new MConnectionForms(connForms), + new MJobForms(jobForms), + frameworkVersion); framework.setPersistenceId(id); bundle = restoreResourceBundle((JSONObject) jsonObject.get(RESOURCES)); http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/common/src/main/java/org/apache/sqoop/json/JobBean.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/sqoop/json/JobBean.java b/common/src/main/java/org/apache/sqoop/json/JobBean.java index 1555bd5..cb659ae 100644 --- a/common/src/main/java/org/apache/sqoop/json/JobBean.java +++ b/common/src/main/java/org/apache/sqoop/json/JobBean.java @@ -17,6 +17,7 @@ */ package org.apache.sqoop.json; +import org.apache.sqoop.common.ConnectorType; import org.apache.sqoop.model.MForm; import org.apache.sqoop.model.MJob; import org.apache.sqoop.model.MJobForms; @@ -42,10 +43,12 @@ public class JobBean implements JsonBean { private static final String ALL = "all"; private static final String ID = "id"; private static final String NAME = "name"; - private static final String TYPE = "type"; - private static final String CONNECTION_ID = "connection-id"; - private static final String CONNECTOR_ID = "connector-id"; - private static final String CONNECTOR_PART = "connector"; + private static final String FROM_CONNECTION_ID = "from-connection-id"; + private static final String TO_CONNECTION_ID = "to-connection-id"; + private static final String FROM_CONNECTOR_ID = "from-connector-id"; + private static final String TO_CONNECTOR_ID = "to-connector-id"; + private static final String FROM_CONNECTOR_PART = "from-connector"; + private static final String TO_CONNECTOR_PART = "to-connector"; private static final String FRAMEWORK_PART = "framework"; // Compulsory @@ -106,16 +109,19 @@ public class JobBean implements JsonBean { object.put(ID, job.getPersistenceId()); object.put(NAME, job.getName()); - object.put(TYPE, job.getType().name()); object.put(ENABLED, job.getEnabled()); object.put(CREATION_USER, job.getCreationUser()); object.put(CREATION_DATE, job.getCreationDate().getTime()); object.put(UPDATE_USER, job.getLastUpdateUser()); object.put(UPDATE_DATE, job.getLastUpdateDate().getTime()); - object.put(CONNECTION_ID, job.getConnectionId()); - object.put(CONNECTOR_ID, job.getConnectorId()); - object.put(CONNECTOR_PART, - extractForms(job.getConnectorPart().getForms(), skipSensitive)); + object.put(FROM_CONNECTION_ID, job.getConnectionId(ConnectorType.FROM)); + object.put(TO_CONNECTION_ID, job.getConnectionId(ConnectorType.TO)); + object.put(FROM_CONNECTOR_ID, job.getConnectorId(ConnectorType.FROM)); + object.put(TO_CONNECTOR_ID, job.getConnectorId(ConnectorType.TO)); + object.put(FROM_CONNECTOR_PART, + extractForms(job.getConnectorPart(ConnectorType.FROM).getForms(),skipSensitive)); + object.put(TO_CONNECTOR_PART, + extractForms(job.getConnectorPart(ConnectorType.TO).getForms(), skipSensitive)); object.put(FRAMEWORK_PART, extractForms(job.getFrameworkPart().getForms(), skipSensitive)); @@ -151,23 +157,26 @@ public class JobBean implements JsonBean { for (Object obj : array) { JSONObject object = (JSONObject) obj; - long connectorId = (Long) object.get(CONNECTOR_ID); - long connectionId = (Long) object.get(CONNECTION_ID); - JSONArray connectorPart = (JSONArray) object.get(CONNECTOR_PART); + long fromConnectorId = (Long) object.get(FROM_CONNECTOR_ID); + long toConnectorId = (Long) object.get(TO_CONNECTOR_ID); + long fromConnectionId = (Long) object.get(FROM_CONNECTION_ID); + long toConnectionId = (Long) object.get(TO_CONNECTION_ID); + JSONArray fromConnectorPart = (JSONArray) object.get(FROM_CONNECTOR_PART); + JSONArray toConnectorPart = (JSONArray) object.get(TO_CONNECTOR_PART); JSONArray frameworkPart = (JSONArray) object.get(FRAMEWORK_PART); - String stringType = (String) object.get(TYPE); - MJob.Type type = MJob.Type.valueOf(stringType); - - List<MForm> connectorForms = restoreForms(connectorPart); + List<MForm> fromConnectorParts = restoreForms(fromConnectorPart); + List<MForm> toConnectorParts = restoreForms(toConnectorPart); List<MForm> frameworkForms = restoreForms(frameworkPart); MJob job = new MJob( - connectorId, - connectionId, - type, - new MJobForms(type, connectorForms), - new MJobForms(type, frameworkForms) + fromConnectorId, + toConnectorId, + fromConnectionId, + toConnectionId, + new MJobForms(fromConnectorParts), + new MJobForms(toConnectorParts), + new MJobForms(frameworkForms) ); job.setPersistenceId((Long) object.get(ID)); http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/common/src/main/java/org/apache/sqoop/json/JobValidationBean.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/sqoop/json/JobValidationBean.java b/common/src/main/java/org/apache/sqoop/json/JobValidationBean.java new file mode 100644 index 0000000..95c24ff --- /dev/null +++ b/common/src/main/java/org/apache/sqoop/json/JobValidationBean.java @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.json; + +import org.apache.sqoop.common.ConnectorType; +import org.apache.sqoop.validation.Status; +import org.apache.sqoop.validation.Validation; +import org.json.simple.JSONObject; + +import java.util.HashMap; +import java.util.Map; + +/** + * Bean for sending validations across network. This bean will move two + * validation objects at one time - one for connector and second for framework + * part of validated entity. Optionally validation bean can also transfer + * created persistent id in case that new entity was created. + */ +public class JobValidationBean implements JsonBean { + + private static final String ID = "id"; + private static final String FRAMEWORK = "framework"; + private static final String CONNECTOR = "connector"; + private static final String FROM = "from"; + private static final String TO = "to"; + private static final String STATUS = "status"; + private static final String MESSAGE = "message"; + private static final String MESSAGES = "messages"; + + private Long id; + private Map<ConnectorType, Validation> connectorValidation; + private Validation frameworkValidation; + + // For "extract" + public JobValidationBean(Validation fromConnector, Validation framework, Validation toConnector) { + this(); + + this.connectorValidation = new HashMap<ConnectorType, Validation>(); + this.connectorValidation.put(ConnectorType.FROM, fromConnector); + this.connectorValidation.put(ConnectorType.TO, toConnector); + this.frameworkValidation = framework; + } + + // For "restore" + public JobValidationBean() { + id = null; + connectorValidation = new HashMap<ConnectorType, Validation>(); + } + + public Validation getConnectorValidation(ConnectorType type) { + return connectorValidation.get(type); + } + + public Validation getFrameworkValidation() { + return frameworkValidation; + } + + public void setId(Long id) { + this.id = id; + } + + public Long getId() { + return id; + } + + @SuppressWarnings("unchecked") + public JSONObject extract(boolean skipSensitive) { + JSONObject object = new JSONObject(); + JSONObject connectorObject = new JSONObject(); + + // Optionally transfer id + if(id != null) { + object.put(ID, id); + } + + connectorObject.put(FROM, extractValidation(getConnectorValidation(ConnectorType.FROM))); + connectorObject.put(TO, extractValidation(getConnectorValidation(ConnectorType.TO))); + + object.put(FRAMEWORK, extractValidation(frameworkValidation)); + object.put(CONNECTOR, connectorObject); + + return object; + } + + @SuppressWarnings("unchecked") + private JSONObject extractValidation(Validation validation) { + JSONObject object = new JSONObject(); + + object.put(STATUS, validation.getStatus().name()); + + JSONObject jsonMessages = new JSONObject(); + Map<Validation.FormInput, Validation.Message> messages = validation.getMessages(); + + for(Map.Entry<Validation.FormInput, Validation.Message> entry : messages.entrySet()) { + JSONObject jsonEntry = new JSONObject(); + jsonEntry.put(STATUS, entry.getValue().getStatus().name()); + jsonEntry.put(MESSAGE, entry.getValue().getMessage()); + jsonMessages.put(entry.getKey(), jsonEntry); + } + + object.put(MESSAGES, jsonMessages); + + return object; + } + + @Override + public void restore(JSONObject jsonObject) { + // Optional and accepting NULLs + id = (Long) jsonObject.get(ID); + + JSONObject jsonConnectorObject = (JSONObject)jsonObject.get(CONNECTOR); + + connectorValidation.put(ConnectorType.FROM, restoreValidation( + (JSONObject)jsonConnectorObject.get(FROM))); + connectorValidation.put(ConnectorType.TO, restoreValidation( + (JSONObject)jsonConnectorObject.get(TO))); + frameworkValidation = restoreValidation( + (JSONObject)jsonObject.get(FRAMEWORK)); + } + + public Validation restoreValidation(JSONObject jsonObject) { + JSONObject jsonMessages = (JSONObject) jsonObject.get(MESSAGES); + Map<Validation.FormInput, Validation.Message> messages + = new HashMap<Validation.FormInput, Validation.Message>(); + + for(Object key : jsonMessages.keySet()) { + JSONObject jsonMessage = (JSONObject) jsonMessages.get(key); + + Status status = Status.valueOf((String) jsonMessage.get(STATUS)); + String stringMessage = (String) jsonMessage.get(MESSAGE); + + Validation.Message message + = new Validation.Message(status, stringMessage); + + messages.put(new Validation.FormInput((String)key), message); + } + + Status status = Status.valueOf((String) jsonObject.get(STATUS)); + + return new Validation(status, messages); + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/common/src/main/java/org/apache/sqoop/json/ValidationBean.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/sqoop/json/ValidationBean.java b/common/src/main/java/org/apache/sqoop/json/ValidationBean.java deleted file mode 100644 index fd36825..0000000 --- a/common/src/main/java/org/apache/sqoop/json/ValidationBean.java +++ /dev/null @@ -1,143 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.json; - -import org.apache.sqoop.validation.Status; -import org.apache.sqoop.validation.Validation; -import org.json.simple.JSONObject; - -import java.util.HashMap; -import java.util.Map; - -/** - * Bean for sending validations across network. This bean will move two - * validation objects at one time - one for connector and second for framework - * part of validated entity. Optionally validation bean can also transfer - * created persistent id in case that new entity was created. - */ -public class ValidationBean implements JsonBean { - - private static final String ID = "id"; - private static final String FRAMEWORK = "framework"; - private static final String CONNECTOR = "connector"; - private static final String STATUS = "status"; - private static final String MESSAGE = "message"; - private static final String MESSAGES = "messages"; - - private Long id; - private Validation connectorValidation; - private Validation frameworkValidation; - - // For "extract" - public ValidationBean(Validation connector, Validation framework) { - this(); - - this.connectorValidation = connector; - this.frameworkValidation = framework; - } - - // For "restore" - public ValidationBean() { - id = null; - } - - public Validation getConnectorValidation() { - return connectorValidation; - } - - public Validation getFrameworkValidation() { - return frameworkValidation; - } - - public void setId(Long id) { - this.id = id; - } - - public Long getId() { - return id; - } - - @SuppressWarnings("unchecked") - public JSONObject extract(boolean skipSensitive) { - JSONObject object = new JSONObject(); - - // Optionally transfer id - if(id != null) { - object.put(ID, id); - } - - object.put(CONNECTOR, extractValidation(connectorValidation)); - object.put(FRAMEWORK, extractValidation(frameworkValidation)); - - return object; - } - - @SuppressWarnings("unchecked") - private JSONObject extractValidation(Validation validation) { - JSONObject object = new JSONObject(); - - object.put(STATUS, validation.getStatus().name()); - - JSONObject jsonMessages = new JSONObject(); - Map<Validation.FormInput, Validation.Message> messages = validation.getMessages(); - - for(Map.Entry<Validation.FormInput, Validation.Message> entry : messages.entrySet()) { - JSONObject jsonEntry = new JSONObject(); - jsonEntry.put(STATUS, entry.getValue().getStatus().name()); - jsonEntry.put(MESSAGE, entry.getValue().getMessage()); - jsonMessages.put(entry.getKey(), jsonEntry); - } - - object.put(MESSAGES, jsonMessages); - - return object; - } - - @Override - public void restore(JSONObject jsonObject) { - // Optional and accepting NULLs - id = (Long) jsonObject.get(ID); - - connectorValidation = restoreValidation( - (JSONObject)jsonObject.get(CONNECTOR)); - frameworkValidation = restoreValidation( - (JSONObject)jsonObject.get(FRAMEWORK)); - } - - public Validation restoreValidation(JSONObject jsonObject) { - JSONObject jsonMessages = (JSONObject) jsonObject.get(MESSAGES); - Map<Validation.FormInput, Validation.Message> messages - = new HashMap<Validation.FormInput, Validation.Message>(); - - for(Object key : jsonMessages.keySet()) { - JSONObject jsonMessage = (JSONObject) jsonMessages.get(key); - - Status status = Status.valueOf((String) jsonMessage.get(STATUS)); - String stringMessage = (String) jsonMessage.get(MESSAGE); - - Validation.Message message - = new Validation.Message(status, stringMessage); - - messages.put(new Validation.FormInput((String)key), message); - } - - Status status = Status.valueOf((String) jsonObject.get(STATUS)); - - return new Validation(status, messages); - } -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/common/src/main/java/org/apache/sqoop/model/MConnector.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/sqoop/model/MConnector.java b/common/src/main/java/org/apache/sqoop/model/MConnector.java index 43fad27..a7518d2 100644 --- a/common/src/main/java/org/apache/sqoop/model/MConnector.java +++ b/common/src/main/java/org/apache/sqoop/model/MConnector.java @@ -17,8 +17,10 @@ */ package org.apache.sqoop.model; -import java.util.ArrayList; -import java.util.List; +import java.util.HashMap; +import java.util.Map; + +import org.apache.sqoop.common.ConnectorType; /** * Connector metadata. @@ -26,14 +28,23 @@ import java.util.List; * Includes unique id that identifies connector in metadata store, unique human * readable name, corresponding name and all forms for all supported job types. */ -public final class MConnector extends MFramework { +public final class MConnector extends MPersistableEntity implements MClonable { private final String uniqueName; private final String className; + private final MConnectionForms connectionForms; + private final Map<ConnectorType, MJobForms> jobForms; + String version; + + public MConnector(String uniqueName, String className, + String version, MConnectionForms connectionForms, + MJobForms fromJobForms, MJobForms toJobForms) { + this.jobForms = new HashMap<ConnectorType, MJobForms>(); - public MConnector(String uniqueName, String className, String version, - MConnectionForms connectionForms, List<MJobForms> jobForms) { - super(connectionForms, jobForms, version); + this.version = version; + this.connectionForms = connectionForms; + this.jobForms.put(ConnectorType.FROM, fromJobForms); + this.jobForms.put(ConnectorType.TO, toJobForms); if (uniqueName == null || className == null) { throw new NullPointerException(); @@ -57,10 +68,8 @@ public final class MConnector extends MFramework { sb.append(uniqueName).append(":").append(getPersistenceId()).append(":"); sb.append(className); sb.append(", ").append(getConnectionForms().toString()); - for(MJobForms entry: getAllJobsForms().values()) { - sb.append(entry.toString()); - } - + sb.append(", ").append(getJobForms(ConnectorType.FROM).toString()); + sb.append(", ").append(getJobForms(ConnectorType.TO).toString()); return sb.toString(); } @@ -78,32 +87,49 @@ public final class MConnector extends MFramework { return uniqueName.equals(mc.uniqueName) && className.equals(mc.className) && version.equals(mc.version) - && super.equals(other); + && connectionForms.equals(mc.getConnectionForms()) + && jobForms.get(ConnectorType.FROM).equals(mc.getJobForms(ConnectorType.FROM)) + && jobForms.get(ConnectorType.TO).equals(mc.getJobForms(ConnectorType.TO)); } @Override public int hashCode() { - int result = super.hashCode(); + int result = getConnectionForms().hashCode(); + result = 31 * result + getJobForms(ConnectorType.FROM).hashCode(); + result = 31 * result + getJobForms(ConnectorType.TO).hashCode(); + result = 31 * result + version.hashCode(); result = 31 * result + uniqueName.hashCode(); result = 31 * result + className.hashCode(); - return result; } - @Override public MConnector clone(boolean cloneWithValue) { //Connector never have any values filled cloneWithValue = false; - List<MJobForms> copyJobForms = null; - if(this.getAllJobsForms()!=null) { - copyJobForms = new ArrayList<MJobForms>(); - for(MJobForms entry: this.getAllJobsForms().values()) { - copyJobForms.add(entry.clone(cloneWithValue)); - } - } - MConnector copy = new MConnector(this.getUniqueName(), this.getClassName(), this.getVersion(), - this.getConnectionForms().clone(cloneWithValue), copyJobForms); + MConnector copy = new MConnector( + this.getUniqueName(), + this.getClassName(), + this.getVersion(), + this.getConnectionForms().clone(cloneWithValue), + this.getJobForms(ConnectorType.FROM).clone(cloneWithValue), + this.getJobForms(ConnectorType.TO).clone(cloneWithValue)); copy.setPersistenceId(this.getPersistenceId()); return copy; } + + public MConnectionForms getConnectionForms() { + return connectionForms; + } + + public MJobForms getJobForms(ConnectorType type) { + return jobForms.get(type); + } + + public String getVersion() { + return version; + } + + public void setVersion(String version) { + this.version = version; + } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/common/src/main/java/org/apache/sqoop/model/MFramework.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/sqoop/model/MFramework.java b/common/src/main/java/org/apache/sqoop/model/MFramework.java index c742459..580db9c 100644 --- a/common/src/main/java/org/apache/sqoop/model/MFramework.java +++ b/common/src/main/java/org/apache/sqoop/model/MFramework.java @@ -17,38 +17,21 @@ */ package org.apache.sqoop.model; -import org.apache.sqoop.common.SqoopException; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - /** - * Metadata describing framework options for connection and job for each - * supported job type. + * Metadata describing framework options for connection and jobForms for each + * supported jobForms type. */ public class MFramework extends MPersistableEntity implements MClonable { private final MConnectionForms connectionForms; - private final Map<MJob.Type, MJobForms> jobs; + private final MJobForms jobForms; String version; - public MFramework(MConnectionForms connectionForms, List<MJobForms> jobForms, + public MFramework(MConnectionForms connectionForms, MJobForms jobForms, String version) { this.version = version; this.connectionForms = connectionForms; - this.jobs = new HashMap<MJob.Type, MJobForms>(); - - for (MJobForms job : jobForms) { - MJob.Type type = job.getType(); - - if(this.jobs.containsKey(type)) { - throw new SqoopException(ModelError.MODEL_001, "Duplicate entry for" - + " jobForms type " + job.getType().name()); - } - this.jobs.put(type, job); - } + this.jobForms = jobForms; } @Override @@ -57,9 +40,7 @@ public class MFramework extends MPersistableEntity implements MClonable { sb.append(getPersistenceId()).append(":"); sb.append("version = " + version); sb.append(", ").append(connectionForms.toString()); - for(MJobForms entry: jobs.values()) { - sb.append(entry.toString()); - } + sb.append(jobForms.toString()); return sb.toString(); } @@ -77,16 +58,13 @@ public class MFramework extends MPersistableEntity implements MClonable { MFramework mo = (MFramework) other; return version.equals(mo.getVersion()) && connectionForms.equals(mo.connectionForms) && - jobs.equals(mo.jobs); + jobForms.equals(mo.jobForms); } @Override public int hashCode() { int result = connectionForms.hashCode(); - - for(MJobForms entry: jobs.values()) { - result = 31 * result + entry.hashCode(); - } + result = 31 * result + jobForms.hashCode(); result = 31 * result + version.hashCode(); return result; } @@ -95,27 +73,16 @@ public class MFramework extends MPersistableEntity implements MClonable { return connectionForms; } - public Map<MJob.Type, MJobForms> getAllJobsForms() { - return jobs; - } - - public MJobForms getJobForms(MJob.Type type) { - return jobs.get(type); + public MJobForms getJobForms() { + return jobForms; } @Override public MFramework clone(boolean cloneWithValue) { //Framework never have any values filled cloneWithValue = false; - List<MJobForms> copyJobForms = null; - if(this.getAllJobsForms()!=null) { - copyJobForms = new ArrayList<MJobForms>(); - for(MJobForms entry: this.getAllJobsForms().values()) { - copyJobForms.add(entry.clone(cloneWithValue)); - } - } MFramework copy = new MFramework(this.getConnectionForms().clone(cloneWithValue), - copyJobForms, this.version); + this.getJobForms().clone(cloneWithValue), this.version); copy.setPersistenceId(this.getPersistenceId()); return copy; } http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/common/src/main/java/org/apache/sqoop/model/MJob.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/sqoop/model/MJob.java b/common/src/main/java/org/apache/sqoop/model/MJob.java index 849168d..6802a74 100644 --- a/common/src/main/java/org/apache/sqoop/model/MJob.java +++ b/common/src/main/java/org/apache/sqoop/model/MJob.java @@ -17,19 +17,16 @@ */ package org.apache.sqoop.model; -import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.common.ConnectorType; + +import java.util.HashMap; +import java.util.Map; /** * Model describing entire job object including both connector and * framework part. */ public class MJob extends MAccountableEntity implements MClonable { - - public static enum Type { - IMPORT, - EXPORT, - } - /** * Connector reference. * @@ -37,46 +34,47 @@ public class MJob extends MAccountableEntity implements MClonable { * dependency through connection object, but having this dependency explicitly * carried along helps a lot. */ - private final long connectorId; + private final Map<ConnectorType, Long> connectorIds; /** - * Corresponding connection object. + * Corresponding connection objects for connector. */ - private final long connectionId; + private final Map<ConnectorType, Long> connectionIds; /** * User name for this object */ private String name; - /** - * Job type - */ - private final Type type; - - private final MJobForms connectorPart; + private final Map<ConnectorType, MJobForms> connectorParts; private final MJobForms frameworkPart; /** * Default constructor to build new MJob model. * - * @param connectorId Connector id - * @param connectionId Connection id - * @param type Job type - * @param connectorPart Connector forms + * @param fromConnectorId Connector id + * @param fromConnectionId Connection id + * @param fromPart From Connector forms + * @param toPart To Connector forms * @param frameworkPart Framework forms */ - public MJob(long connectorId, - long connectionId, - Type type, - MJobForms connectorPart, + public MJob(long fromConnectorId, + long toConnectorId, + long fromConnectionId, + long toConnectionId, + MJobForms fromPart, + MJobForms toPart, MJobForms frameworkPart) { - this.connectorId = connectorId; - this.connectionId = connectionId; - this.type = type; - this.connectorPart = connectorPart; + connectorIds = new HashMap<ConnectorType, Long>(); + connectorIds.put(ConnectorType.FROM, fromConnectorId); + connectorIds.put(ConnectorType.TO, toConnectorId); + connectionIds = new HashMap<ConnectorType, Long>(); + connectionIds.put(ConnectorType.FROM, fromConnectionId); + connectionIds.put(ConnectorType.TO, toConnectionId); + connectorParts = new HashMap<ConnectorType, MJobForms>(); + connectorParts.put(ConnectorType.FROM, fromPart); + connectorParts.put(ConnectorType.TO, toPart); this.frameworkPart = frameworkPart; - verifyFormsOfSameType(); } /** @@ -85,7 +83,10 @@ public class MJob extends MAccountableEntity implements MClonable { * @param other MConnection model to copy */ public MJob(MJob other) { - this(other, other.connectorPart.clone(true), other.frameworkPart.clone(true)); + this(other, + other.getConnectorPart(ConnectorType.FROM).clone(true), + other.getConnectorPart(ConnectorType.TO).clone(true), + other.frameworkPart.clone(true)); } /** @@ -95,34 +96,31 @@ public class MJob extends MAccountableEntity implements MClonable { * used otherwise. * * @param other MJob model to copy - * @param connectorPart Connector forms + * @param fromPart From Connector forms * @param frameworkPart Framework forms + * @param toPart To Connector forms */ - public MJob(MJob other, MJobForms connectorPart, MJobForms frameworkPart) { + public MJob(MJob other, MJobForms fromPart, MJobForms frameworkPart, MJobForms toPart) { super(other); - this.connectionId = other.connectionId; - this.connectorId = other.connectorId; - this.type = other.type; + connectorIds = new HashMap<ConnectorType, Long>(); + connectorIds.put(ConnectorType.FROM, other.getConnectorId(ConnectorType.FROM)); + connectorIds.put(ConnectorType.TO, other.getConnectorId(ConnectorType.TO)); + connectionIds = new HashMap<ConnectorType, Long>(); + connectorIds.put(ConnectorType.FROM, other.getConnectionId(ConnectorType.FROM)); + connectorIds.put(ConnectorType.TO, other.getConnectionId(ConnectorType.TO)); + connectorParts = new HashMap<ConnectorType, MJobForms>(); + connectorParts.put(ConnectorType.FROM, fromPart); + connectorParts.put(ConnectorType.TO, toPart); this.name = other.name; - this.connectorPart = connectorPart; this.frameworkPart = frameworkPart; - verifyFormsOfSameType(); - } - - private void verifyFormsOfSameType() { - if (type != connectorPart.getType() || type != frameworkPart.getType()) { - throw new SqoopException(ModelError.MODEL_002, - "Incompatible types, job: " + type.name() - + ", connector part: " + connectorPart.getType().name() - + ", framework part: " + frameworkPart.getType().name() - ); - } } @Override public String toString() { - StringBuilder sb = new StringBuilder("job connector-part: "); - sb.append(connectorPart).append(", framework-part: ").append(frameworkPart); + StringBuilder sb = new StringBuilder("job"); + sb.append(" connector-from-part: ").append(getConnectorPart(ConnectorType.FROM)); + sb.append(", connector-to-part: ").append(getConnectorPart(ConnectorType.TO)); + sb.append(", framework-part: ").append(frameworkPart); return sb.toString(); } @@ -135,32 +133,35 @@ public class MJob extends MAccountableEntity implements MClonable { this.name = name; } - public long getConnectionId() { - return connectionId; + public long getConnectionId(ConnectorType type) { + return connectionIds.get(type); } - public long getConnectorId() { - return connectorId; + public long getConnectorId(ConnectorType type) { + return connectorIds.get(type); } - public MJobForms getConnectorPart() { - return connectorPart; + public MJobForms getConnectorPart(ConnectorType type) { + return connectorParts.get(type); } public MJobForms getFrameworkPart() { return frameworkPart; } - public Type getType() { - return type; - } - @Override public MJob clone(boolean cloneWithValue) { if(cloneWithValue) { return new MJob(this); } else { - return new MJob(connectorId, connectionId, type, connectorPart.clone(false), frameworkPart.clone(false)); + return new MJob( + getConnectorId(ConnectorType.FROM), + getConnectorId(ConnectorType.TO), + getConnectionId(ConnectorType.FROM), + getConnectionId(ConnectorType.TO), + getConnectorPart(ConnectorType.FROM).clone(false), + getConnectorPart(ConnectorType.TO).clone(false), + frameworkPart.clone(false)); } } @@ -175,11 +176,13 @@ public class MJob extends MAccountableEntity implements MClonable { } MJob job = (MJob)object; - return (job.connectorId == this.connectorId) - && (job.connectionId == this.connectionId) + return (job.getConnectorId(ConnectorType.FROM) == this.getConnectorId(ConnectorType.FROM)) + && (job.getConnectorId(ConnectorType.TO) == this.getConnectorId(ConnectorType.TO)) + && (job.getConnectionId(ConnectorType.FROM) == this.getConnectionId(ConnectorType.FROM)) + && (job.getConnectionId(ConnectorType.TO) == this.getConnectionId(ConnectorType.TO)) && (job.getPersistenceId() == this.getPersistenceId()) - && (job.type.equals(this.type)) - && (job.connectorPart.equals(this.connectorPart)) + && (job.getConnectorPart(ConnectorType.FROM).equals(this.getConnectorPart(ConnectorType.FROM))) + && (job.getConnectorPart(ConnectorType.TO).equals(this.getConnectorPart(ConnectorType.TO))) && (job.frameworkPart.equals(this.frameworkPart)); } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/common/src/main/java/org/apache/sqoop/model/MJobForms.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/sqoop/model/MJobForms.java b/common/src/main/java/org/apache/sqoop/model/MJobForms.java index f697023..08b9a78 100644 --- a/common/src/main/java/org/apache/sqoop/model/MJobForms.java +++ b/common/src/main/java/org/apache/sqoop/model/MJobForms.java @@ -20,28 +20,12 @@ package org.apache.sqoop.model; import java.util.List; /** - * Metadata describing all required information to build up an job - * object for one part. Both connector and framework need to supply this object - * to build up entire job. + * Metadata describing all required information to build a job + * object with two connectors and a framework. */ public class MJobForms extends MFormList { - - private final MJob.Type type; - - public MJobForms(MJob.Type type, List<MForm> forms) { + public MJobForms(List<MForm> forms) { super(forms); - this.type = type; - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder("Job type: ").append(type.name()); - sb.append(super.toString()); - return sb.toString(); - } - - public MJob.Type getType() { - return type; } @Override @@ -55,19 +39,17 @@ public class MJobForms extends MFormList { } MJobForms mj = (MJobForms) other; - return type.equals(mj.type) && super.equals(mj); + return super.equals(mj); } @Override public int hashCode() { - int result = super.hashCode(); - result = 31 * result + type.hashCode(); - return result; + return super.hashCode(); } @Override public MJobForms clone(boolean cloneWithValue) { - MJobForms copy = new MJobForms(this.type, super.clone(cloneWithValue).getForms()); + MJobForms copy = new MJobForms(super.clone(cloneWithValue).getForms()); return copy; } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java index 298288e..1473dba 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java @@ -20,33 +20,33 @@ package org.apache.sqoop.connector.jdbc; import java.util.Locale; import java.util.ResourceBundle; +import org.apache.sqoop.common.ConnectorType; import org.apache.sqoop.common.VersionInfo; import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; import org.apache.sqoop.connector.idf.IntermediateDataFormat; import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration; -import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration; -import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration; +import org.apache.sqoop.connector.jdbc.configuration.FromJobConfiguration; +import org.apache.sqoop.connector.jdbc.configuration.ToJobConfiguration; import org.apache.sqoop.connector.spi.MetadataUpgrader; -import org.apache.sqoop.job.etl.Exporter; -import org.apache.sqoop.job.etl.Importer; +import org.apache.sqoop.job.etl.From; +import org.apache.sqoop.job.etl.To; import org.apache.sqoop.connector.spi.SqoopConnector; -import org.apache.sqoop.model.MJob; import org.apache.sqoop.validation.Validator; public class GenericJdbcConnector extends SqoopConnector { private static GenericJdbcValidator genericJdbcValidator = new GenericJdbcValidator(); - private static final Importer IMPORTER = new Importer( - GenericJdbcImportInitializer.class, - GenericJdbcImportPartitioner.class, - GenericJdbcImportExtractor.class, - GenericJdbcImportDestroyer.class); + private static final From FROM = new From( + GenericJdbcFromInitializer.class, + GenericJdbcPartitioner.class, + GenericJdbcExtractor.class, + GenericJdbcFromDestroyer.class); - private static final Exporter EXPORTER = new Exporter( - GenericJdbcExportInitializer.class, - GenericJdbcExportLoader.class, - GenericJdbcExportDestroyer.class); + private static final To TO = new To( + GenericJdbcToInitializer.class, + GenericJdbcLoader.class, + GenericJdbcToDestroyer.class); /** @@ -72,25 +72,25 @@ public class GenericJdbcConnector extends SqoopConnector { } @Override - public Class getJobConfigurationClass(MJob.Type jobType) { + public Class getJobConfigurationClass(ConnectorType jobType) { switch (jobType) { - case IMPORT: - return ImportJobConfiguration.class; - case EXPORT: - return ExportJobConfiguration.class; + case FROM: + return FromJobConfiguration.class; + case TO: + return ToJobConfiguration.class; default: return null; } } @Override - public Importer getImporter() { - return IMPORTER; + public From getFrom() { + return FROM; } @Override - public Exporter getExporter() { - return EXPORTER; + public To getTo() { + return TO; } @Override http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorConstants.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorConstants.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorConstants.java index abcc89d..a51fb7d 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorConstants.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorConstants.java @@ -42,8 +42,10 @@ public final class GenericJdbcConnectorConstants { public static final String CONNECTOR_JDBC_PARTITION_MAXVALUE = PREFIX_CONNECTOR_JDBC_CONFIG + "partition.maxvalue"; - public static final String CONNECTOR_JDBC_DATA_SQL = - PREFIX_CONNECTOR_JDBC_CONFIG + "data.sql"; + public static final String CONNECTOR_FROM_JDBC_DATA_SQL = + PREFIX_CONNECTOR_JDBC_CONFIG + "from.data.sql"; + public static final String CONNECTOR_TO_JDBC_DATA_SQL = + PREFIX_CONNECTOR_JDBC_CONFIG + "to.data.sql"; public static final String SQL_CONDITIONS_TOKEN = "${CONDITIONS}"; http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportDestroyer.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportDestroyer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportDestroyer.java deleted file mode 100644 index c5faa09..0000000 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportDestroyer.java +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.connector.jdbc; - -import org.apache.log4j.Logger; -import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration; -import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration; -import org.apache.sqoop.job.etl.Destroyer; -import org.apache.sqoop.job.etl.DestroyerContext; - -public class GenericJdbcExportDestroyer extends Destroyer<ConnectionConfiguration, ExportJobConfiguration> { - - private static final Logger LOG = Logger.getLogger(GenericJdbcExportDestroyer.class); - - @Override - public void destroy(DestroyerContext context, ConnectionConfiguration connection, ExportJobConfiguration job) { - LOG.info("Running generic JDBC connector destroyer"); - - final String tableName = job.table.tableName; - final String stageTableName = job.table.stageTableName; - final boolean stageEnabled = stageTableName != null && - stageTableName.length() > 0; - if(stageEnabled) { - moveDataToDestinationTable(connection, - context.isSuccess(), stageTableName, tableName); - } - } - - private void moveDataToDestinationTable(ConnectionConfiguration connectorConf, - boolean success, String stageTableName, String tableName) { - GenericJdbcExecutor executor = - new GenericJdbcExecutor(connectorConf.connection.jdbcDriver, - connectorConf.connection.connectionString, - connectorConf.connection.username, - connectorConf.connection.password); - - if(success) { - LOG.info("Job completed, transferring data from stage table to " + - "destination table."); - executor.migrateData(stageTableName, tableName); - } else { - LOG.warn("Job failed, clearing stage table."); - executor.deleteTableData(stageTableName); - } - } - -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java deleted file mode 100644 index 80253be..0000000 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java +++ /dev/null @@ -1,222 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.connector.jdbc; - -import java.sql.ResultSet; -import java.sql.ResultSetMetaData; -import java.sql.SQLException; -import java.util.LinkedList; -import java.util.List; - -import org.apache.commons.lang.StringUtils; -import org.apache.log4j.Logger; -import org.apache.sqoop.common.MutableContext; -import org.apache.sqoop.common.SqoopException; -import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration; -import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration; -import org.apache.sqoop.connector.jdbc.util.SqlTypesUtils; -import org.apache.sqoop.job.etl.Initializer; -import org.apache.sqoop.job.etl.InitializerContext; -import org.apache.sqoop.schema.Schema; -import org.apache.sqoop.schema.type.Column; -import org.apache.sqoop.utils.ClassUtils; - -public class GenericJdbcExportInitializer extends Initializer<ConnectionConfiguration, ExportJobConfiguration> { - - private GenericJdbcExecutor executor; - private static final Logger LOG = - Logger.getLogger(GenericJdbcExportInitializer.class); - - @Override - public void initialize(InitializerContext context, ConnectionConfiguration connection, ExportJobConfiguration job) { - configureJdbcProperties(context.getContext(), connection, job); - try { - configureTableProperties(context.getContext(), connection, job); - } finally { - executor.close(); - } - } - - @Override - public List<String> getJars(InitializerContext context, ConnectionConfiguration connection, ExportJobConfiguration job) { - List<String> jars = new LinkedList<String>(); - - jars.add(ClassUtils.jarForClass(connection.connection.jdbcDriver)); - - return jars; - } - - @Override - public Schema getSchema(InitializerContext context, ConnectionConfiguration connectionConfiguration, ExportJobConfiguration exportJobConfiguration) { - configureJdbcProperties(context.getContext(), connectionConfiguration, exportJobConfiguration); - - String schemaName = exportJobConfiguration.table.tableName; - - if (schemaName == null) { - throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0019, - "Table name extraction not supported yet."); - } - - if(exportJobConfiguration.table.schemaName != null) { - schemaName = exportJobConfiguration.table.schemaName + "." + schemaName; - } - - Schema schema = new Schema(schemaName); - ResultSet rs = null; - ResultSetMetaData rsmt = null; - try { - rs = executor.executeQuery("SELECT * FROM " + schemaName + " WHERE 1 = 0"); - - rsmt = rs.getMetaData(); - for (int i = 1 ; i <= rsmt.getColumnCount(); i++) { - Column column = SqlTypesUtils.sqlTypeToAbstractType(rsmt.getColumnType(i)); - - String columnName = rsmt.getColumnName(i); - if (columnName == null || columnName.equals("")) { - columnName = rsmt.getColumnLabel(i); - if (null == columnName) { - columnName = "Column " + i; - } - } - - column.setName(columnName); - schema.addColumn(column); - } - - return schema; - } catch (SQLException e) { - throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0016, e); - } finally { - if(rs != null) { - try { - rs.close(); - } catch (SQLException e) { - LOG.info("Ignoring exception while closing ResultSet", e); - } - } - } - } - - private void configureJdbcProperties(MutableContext context, ConnectionConfiguration connectionConfig, ExportJobConfiguration jobConfig) { - String driver = connectionConfig.connection.jdbcDriver; - String url = connectionConfig.connection.connectionString; - String username = connectionConfig.connection.username; - String password = connectionConfig.connection.password; - - assert driver != null; - assert url != null; - - executor = new GenericJdbcExecutor(driver, url, username, password); - } - - private void configureTableProperties(MutableContext context, ConnectionConfiguration connectionConfig, ExportJobConfiguration jobConfig) { - String dataSql; - - String schemaName = jobConfig.table.schemaName; - String tableName = jobConfig.table.tableName; - String stageTableName = jobConfig.table.stageTableName; - boolean clearStageTable = jobConfig.table.clearStageTable == null ? - false : jobConfig.table.clearStageTable; - final boolean stageEnabled = - stageTableName != null && stageTableName.length() > 0; - String tableSql = jobConfig.table.sql; - String tableColumns = jobConfig.table.columns; - - if (tableName != null && tableSql != null) { - // when both table name and table sql are specified: - throw new SqoopException( - GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0007); - - } else if (tableName != null) { - // when table name is specified: - if(stageEnabled) { - LOG.info("Stage has been enabled."); - LOG.info("Use stageTable: " + stageTableName + - " with clearStageTable: " + clearStageTable); - - if(clearStageTable) { - executor.deleteTableData(stageTableName); - } else { - long stageRowCount = executor.getTableRowCount(stageTableName); - if(stageRowCount > 0) { - throw new SqoopException( - GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0017); - } - } - } - - // For databases that support schemas (IE: postgresql). - final String tableInUse = stageEnabled ? stageTableName : tableName; - String fullTableName = (schemaName == null) ? - executor.delimitIdentifier(tableInUse) : - executor.delimitIdentifier(schemaName) + - "." + executor.delimitIdentifier(tableInUse); - - if (tableColumns == null) { - String[] columns = executor.getQueryColumns("SELECT * FROM " - + fullTableName + " WHERE 1 = 0"); - StringBuilder builder = new StringBuilder(); - builder.append("INSERT INTO "); - builder.append(fullTableName); - builder.append(" VALUES (?"); - for (int i = 1; i < columns.length; i++) { - builder.append(",?"); - } - builder.append(")"); - dataSql = builder.toString(); - - } else { - String[] columns = StringUtils.split(tableColumns, ','); - StringBuilder builder = new StringBuilder(); - builder.append("INSERT INTO "); - builder.append(fullTableName); - builder.append(" ("); - builder.append(tableColumns); - builder.append(") VALUES (?"); - for (int i = 1; i < columns.length; i++) { - builder.append(",?"); - } - builder.append(")"); - dataSql = builder.toString(); - } - } else if (tableSql != null) { - // when table sql is specified: - - if (tableSql.indexOf( - GenericJdbcConnectorConstants.SQL_PARAMETER_MARKER) == -1) { - // make sure parameter marker is in the specified sql - throw new SqoopException( - GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0013); - } - - if (tableColumns == null) { - dataSql = tableSql; - } else { - throw new SqoopException( - GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0014); - } - } else { - // when neither are specified: - throw new SqoopException( - GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0008); - } - - context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL, - dataSql.toString()); - } -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java deleted file mode 100644 index 15e7101..0000000 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java +++ /dev/null @@ -1,76 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.connector.jdbc; - -import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration; -import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration; -import org.apache.sqoop.job.etl.Loader; -import org.apache.sqoop.job.etl.LoaderContext; - -public class GenericJdbcExportLoader extends Loader<ConnectionConfiguration, ExportJobConfiguration> { - - public static final int DEFAULT_ROWS_PER_BATCH = 100; - public static final int DEFAULT_BATCHES_PER_TRANSACTION = 100; - private int rowsPerBatch = DEFAULT_ROWS_PER_BATCH; - private int batchesPerTransaction = DEFAULT_BATCHES_PER_TRANSACTION; - - @Override - public void load(LoaderContext context, ConnectionConfiguration connection, ExportJobConfiguration job) throws Exception{ - String driver = connection.connection.jdbcDriver; - String url = connection.connection.connectionString; - String username = connection.connection.username; - String password = connection.connection.password; - GenericJdbcExecutor executor = new GenericJdbcExecutor(driver, url, username, password); - executor.setAutoCommit(false); - - String sql = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL); - executor.beginBatch(sql); - try { - int numberOfRows = 0; - int numberOfBatches = 0; - Object[] array; - - while ((array = context.getDataReader().readArrayRecord()) != null) { - numberOfRows++; - executor.addBatch(array); - - if (numberOfRows == rowsPerBatch) { - numberOfBatches++; - if (numberOfBatches == batchesPerTransaction) { - executor.executeBatch(true); - numberOfBatches = 0; - } else { - executor.executeBatch(false); - } - numberOfRows = 0; - } - } - - if (numberOfRows != 0 || numberOfBatches != 0) { - // execute and commit the remaining rows - executor.executeBatch(true); - } - - executor.endBatch(); - - } finally { - executor.close(); - } - } - -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExtractor.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExtractor.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExtractor.java new file mode 100644 index 0000000..2428199 --- /dev/null +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExtractor.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.jdbc; + +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; + +import org.apache.log4j.Logger; +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration; +import org.apache.sqoop.connector.jdbc.configuration.FromJobConfiguration; +import org.apache.sqoop.job.etl.ExtractorContext; +import org.apache.sqoop.job.etl.Extractor; + +public class GenericJdbcExtractor extends Extractor<ConnectionConfiguration, FromJobConfiguration, GenericJdbcPartition> { + + public static final Logger LOG = Logger.getLogger(GenericJdbcExtractor.class); + + private long rowsRead = 0; + @Override + public void extract(ExtractorContext context, ConnectionConfiguration connection, FromJobConfiguration job, GenericJdbcPartition partition) { + String driver = connection.connection.jdbcDriver; + String url = connection.connection.connectionString; + String username = connection.connection.username; + String password = connection.connection.password; + GenericJdbcExecutor executor = new GenericJdbcExecutor(driver, url, username, password); + + String query = context.getString(GenericJdbcConnectorConstants.CONNECTOR_FROM_JDBC_DATA_SQL); + String conditions = partition.getConditions(); + query = query.replace(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, conditions); + LOG.info("Using query: " + query); + + rowsRead = 0; + ResultSet resultSet = executor.executeQuery(query); + + try { + ResultSetMetaData metaData = resultSet.getMetaData(); + int column = metaData.getColumnCount(); + while (resultSet.next()) { + Object[] array = new Object[column]; + for (int i = 0; i< column; i++) { + array[i] = resultSet.getObject(i + 1) == null ? GenericJdbcConnectorConstants.SQL_NULL_VALUE + : resultSet.getObject(i + 1); + } + context.getDataWriter().writeArrayRecord(array); + rowsRead++; + } + } catch (SQLException e) { + throw new SqoopException( + GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0004, e); + + } finally { + executor.close(); + } + } + + @Override + public long getRowsRead() { + return rowsRead; + } + +}
