http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/shell/src/main/java/org/apache/sqoop/shell/core/Constants.java ---------------------------------------------------------------------- diff --git a/shell/src/main/java/org/apache/sqoop/shell/core/Constants.java b/shell/src/main/java/org/apache/sqoop/shell/core/Constants.java index 475f41c..908b44d 100644 --- a/shell/src/main/java/org/apache/sqoop/shell/core/Constants.java +++ b/shell/src/main/java/org/apache/sqoop/shell/core/Constants.java @@ -35,6 +35,8 @@ public class Constants { // Options public static final String OPT_XID = "xid"; + public static final String OPT_FXID = "fxid"; + public static final String OPT_TXID = "txid"; public static final String OPT_ALL = "all"; public static final String OPT_JID = "jid"; public static final String OPT_CID = "cid"; @@ -54,6 +56,8 @@ public class Constants { public static final String OPT_DETAIL = "detail"; public static final char OPT_XID_CHAR = 'x'; + public static final char OPT_FXID_CHAR = 'f'; + public static final char OPT_TXID_CHAR = 't'; public static final char OPT_ALL_CHAR = 'a'; public static final char OPT_JID_CHAR = 'j'; public static final char OPT_CID_CHAR = 'c'; @@ -143,12 +147,14 @@ public class Constants { "args.function.unknown"; public static final String RES_ARGS_XID_MISSING = "args.xid_missing"; + public static final String RES_ARGS_FXID_MISSING = + "args.fxid_missing"; + public static final String RES_ARGS_TXID_MISSING = + "args.txid_missing"; public static final String RES_ARGS_JID_MISSING = "args.jid_missing"; public static final String RES_ARGS_CID_MISSING = "args.cid_missing"; - public static final String RES_ARGS_TYPE_MISSING = - "args.type_missing"; public static final String RES_ARGS_NAME_MISSING = "args.name_missing"; public static final String RES_ARGS_VALUE_MISSING = @@ -160,8 +166,6 @@ public class Constants { "prompt.job_id"; public static final String RES_CONNECTOR_ID = "prompt.connector_id"; - public static final String RES_PROMPT_JOB_TYPE = - "prompt.job_type"; public static final String RES_PROMPT_UPDATE_CONN_METADATA = "prompt.update_conn_metadata"; public static final String RES_PROMPT_UPDATE_JOB_METADATA = @@ -375,10 +379,12 @@ public class Constants { "table.header.version"; public static final String RES_TABLE_HEADER_CLASS = "table.header.class"; - public static final String RES_TABLE_HEADER_TYPE = - "table.header.type"; public static final String RES_TABLE_HEADER_CONNECTOR = "table.header.connector"; + public static final String RES_TABLE_HEADER_FROM_CONNECTOR = + "table.header.connector.from"; + public static final String RES_TABLE_HEADER_TO_CONNECTOR = + "table.header.connector.to"; public static final String RES_TABLE_HEADER_JOB_ID = "table.header.jid"; public static final String RES_TABLE_HEADER_EXTERNAL_ID = @@ -390,14 +396,10 @@ public class Constants { public static final String RES_TABLE_HEADER_ENABLED = "table.header.enabled"; - public static final String RES_FORMDISPLAYER_SUPPORTED_JOBTYPE = - "formdisplayer.supported_job_types"; public static final String RES_FORMDISPLAYER_CONNECTION = "formdisplayer.connection"; public static final String RES_FORMDISPLAYER_JOB = "formdisplayer.job"; - public static final String RES_FORMDISPLAYER_FORM_JOBTYPE = - "formdisplayer.forms_jobtype"; public static final String RES_FORMDISPLAYER_FORM = "formdisplayer.form"; public static final String RES_FORMDISPLAYER_NAME =
http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/shell/src/main/java/org/apache/sqoop/shell/utils/FormDisplayer.java ---------------------------------------------------------------------- diff --git a/shell/src/main/java/org/apache/sqoop/shell/utils/FormDisplayer.java b/shell/src/main/java/org/apache/sqoop/shell/utils/FormDisplayer.java index 56e0b4e..44196e6 100644 --- a/shell/src/main/java/org/apache/sqoop/shell/utils/FormDisplayer.java +++ b/shell/src/main/java/org/apache/sqoop/shell/utils/FormDisplayer.java @@ -18,9 +18,11 @@ package org.apache.sqoop.shell.utils; import org.apache.commons.lang.StringUtils; +import org.apache.sqoop.common.ConnectorType; import org.apache.sqoop.model.MAccountableEntity; import org.apache.sqoop.model.MBooleanInput; import org.apache.sqoop.model.MConnection; +import org.apache.sqoop.model.MConnector; import org.apache.sqoop.model.MEnumInput; import org.apache.sqoop.model.MForm; import org.apache.sqoop.model.MFramework; @@ -28,7 +30,6 @@ import org.apache.sqoop.model.MInput; import org.apache.sqoop.model.MInputType; import org.apache.sqoop.model.MIntegerInput; import org.apache.sqoop.model.MJob; -import org.apache.sqoop.model.MJobForms; import org.apache.sqoop.model.MMapInput; import org.apache.sqoop.model.MStringInput; import org.apache.sqoop.shell.core.Constants; @@ -49,21 +50,34 @@ public final class FormDisplayer { public static void displayFormMetadataDetails(MFramework framework, ResourceBundle bundle) { - print(" %s: ", resourceString(Constants.RES_FORMDISPLAYER_SUPPORTED_JOBTYPE)); - println(framework.getAllJobsForms().keySet().toString()); - displayFormsMetadata( framework.getConnectionForms().getForms(), resourceString(Constants.RES_FORMDISPLAYER_CONNECTION), bundle); - for (MJobForms jobForms : framework.getAllJobsForms().values()) { - print(" %s ", resourceString(Constants.RES_FORMDISPLAYER_FORM_JOBTYPE)); - print(jobForms.getType().name()); - println(":"); + displayFormsMetadata( + framework.getJobForms().getForms(), + resourceString(Constants.RES_FORMDISPLAYER_JOB), + bundle); + } - displayFormsMetadata(jobForms.getForms(), resourceString(Constants.RES_FORMDISPLAYER_JOB), bundle); - } + public static void displayFormMetadataDetails(MConnector connector, + ResourceBundle bundle) { + displayFormsMetadata( + connector.getConnectionForms().getForms(), + resourceString(Constants.RES_FORMDISPLAYER_CONNECTION), + bundle); + + // @TODO(Abe): Validate From/To output is correct. + displayFormsMetadata( + connector.getJobForms(ConnectorType.FROM).getForms(), + resourceString(Constants.RES_FORMDISPLAYER_JOB), + bundle); + + displayFormsMetadata( + connector.getJobForms(ConnectorType.TO).getForms(), + resourceString(Constants.RES_FORMDISPLAYER_JOB), + bundle); } public static void displayFormsMetadata(List<MForm> forms, @@ -139,8 +153,9 @@ public final class FormDisplayer { formList.addAll(connection.getFrameworkPart().getForms()); } else if(entity instanceof MJob) { MJob job = (MJob) entity; - formList.addAll(job.getConnectorPart().getForms()); + formList.addAll(job.getConnectorPart(ConnectorType.FROM).getForms()); formList.addAll(job.getFrameworkPart().getForms()); + formList.addAll(job.getConnectorPart(ConnectorType.TO).getForms()); } for(MForm form : formList) { if(form.getValidationStatus() == Status.ACCEPTABLE) { http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/shell/src/main/java/org/apache/sqoop/shell/utils/FormFiller.java ---------------------------------------------------------------------- diff --git a/shell/src/main/java/org/apache/sqoop/shell/utils/FormFiller.java b/shell/src/main/java/org/apache/sqoop/shell/utils/FormFiller.java index c491ae5..cc75d94 100644 --- a/shell/src/main/java/org/apache/sqoop/shell/utils/FormFiller.java +++ b/shell/src/main/java/org/apache/sqoop/shell/utils/FormFiller.java @@ -21,6 +21,7 @@ import jline.ConsoleReader; import org.apache.commons.cli.CommandLine; import org.apache.commons.lang.StringUtils; +import org.apache.sqoop.common.ConnectorType; import org.apache.sqoop.model.MBooleanInput; import org.apache.sqoop.model.MConnection; import org.apache.sqoop.model.MEnumInput; @@ -55,7 +56,7 @@ public final class FormFiller { /** * Fill job object based on CLI options. * - * @param reader Associated console reader object + * @param line Associated console reader object * @param job Job that user is suppose to fill in * @return True if we filled all inputs, false if user has stopped processing * @throws IOException @@ -68,7 +69,7 @@ public final class FormFiller { // Fill in data from user return fillForms(line, - job.getConnectorPart().getForms(), + job.getConnectorPart(ConnectorType.FROM).getForms(), job.getFrameworkPart().getForms()); } @@ -77,25 +78,28 @@ public final class FormFiller { * * @param reader Associated console reader object * @param job Job that user is suppose to fill in - * @param connectorBundle Connector resource bundle + * @param fromConnectorBundle Connector resource bundle * @param frameworkBundle Framework resource bundle * @return True if we filled all inputs, false if user has stopped processing * @throws IOException */ public static boolean fillJob(ConsoleReader reader, MJob job, - ResourceBundle connectorBundle, - ResourceBundle frameworkBundle) + ResourceBundle fromConnectorBundle, + ResourceBundle frameworkBundle, + ResourceBundle toConnectorBundle) throws IOException { job.setName(getName(reader, job.getName())); // Fill in data from user return fillForms(reader, - job.getConnectorPart().getForms(), - connectorBundle, + job.getConnectorPart(ConnectorType.FROM).getForms(), + fromConnectorBundle, job.getFrameworkPart().getForms(), - frameworkBundle); + frameworkBundle, + job.getConnectorPart(ConnectorType.TO).getForms(), + toConnectorBundle); } /** @@ -387,8 +391,7 @@ public final class FormFiller { List<MForm> connectorForms, ResourceBundle connectorBundle, List<MForm> frameworkForms, - ResourceBundle frameworkBundle - ) throws IOException { + ResourceBundle frameworkBundle) throws IOException { // Query connector forms @@ -400,6 +403,32 @@ public final class FormFiller { if(!fillForms(frameworkForms, reader, frameworkBundle)) { return false; } + return true; + } + + public static boolean fillForms(ConsoleReader reader, + List<MForm> fromConnectorForms, + ResourceBundle fromConnectorBundle, + List<MForm> frameworkForms, + ResourceBundle frameworkBundle, + List<MForm> toConnectorForms, + ResourceBundle toConnectorBundle) throws IOException { + + + // From connector forms + if(!fillForms(fromConnectorForms, reader, fromConnectorBundle)) { + return false; + } + + // Query framework forms + if(!fillForms(frameworkForms, reader, frameworkBundle)) { + return false; + } + + // To connector forms + if(!fillForms(toConnectorForms, reader, toConnectorBundle)) { + return false; + } return true; } @@ -880,7 +909,7 @@ public final class FormFiller { } public static void printJobValidationMessages(MJob job) { - for (MForm form : job.getConnectorPart().getForms()) { + for (MForm form : job.getConnectorPart(ConnectorType.FROM).getForms()) { for (MInput<?> input : form.getInputs()) { printValidationMessage(input, true); } @@ -890,6 +919,11 @@ public final class FormFiller { printValidationMessage(input, true); } } + for (MForm form : job.getConnectorPart(ConnectorType.TO).getForms()) { + for (MInput<?> input : form.getInputs()) { + printValidationMessage(input, true); + } + } } private FormFiller() { http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/shell/src/main/java/org/apache/sqoop/shell/utils/JobDynamicFormOptions.java ---------------------------------------------------------------------- diff --git a/shell/src/main/java/org/apache/sqoop/shell/utils/JobDynamicFormOptions.java b/shell/src/main/java/org/apache/sqoop/shell/utils/JobDynamicFormOptions.java index aa118e1..40a4e33 100644 --- a/shell/src/main/java/org/apache/sqoop/shell/utils/JobDynamicFormOptions.java +++ b/shell/src/main/java/org/apache/sqoop/shell/utils/JobDynamicFormOptions.java @@ -19,6 +19,7 @@ package org.apache.sqoop.shell.utils; import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionBuilder; +import org.apache.sqoop.common.ConnectorType; import org.apache.sqoop.model.MJob; /** @@ -34,11 +35,14 @@ public class JobDynamicFormOptions extends DynamicFormOptions<MJob> { .withLongOpt("name") .hasArg() .create()); - for (Option option : FormOptions.getFormsOptions("connector", job.getConnectorPart().getForms())) { + for (Option option : FormOptions.getFormsOptions("connector", job.getConnectorPart(ConnectorType.FROM).getForms())) { this.addOption(option); } for (Option option : FormOptions.getFormsOptions("framework", job.getFrameworkPart().getForms())) { this.addOption(option); } + for (Option option : FormOptions.getFormsOptions("connector", job.getConnectorPart(ConnectorType.TO).getForms())) { + this.addOption(option); + } } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/shell/src/main/resources/shell-resource.properties ---------------------------------------------------------------------- diff --git a/shell/src/main/resources/shell-resource.properties b/shell/src/main/resources/shell-resource.properties index df9457d..d4c782e 100644 --- a/shell/src/main/resources/shell-resource.properties +++ b/shell/src/main/resources/shell-resource.properties @@ -30,9 +30,10 @@ object-name.help = Non unique name of the entity to help you remember \ # args.function.unknown = The specified function "{0}" is not recognized. args.xid_missing = Required argument --xid is missing. +args.fxid_missing = Required argument --fxid is missing. +args.txid_missing = Required argument --txid is missing. args.jid_missing = Required argument --jid is missing. args.cid_missing = Required argument --cid is missing. -args.type_missing = Required argument --type is missing. args.name_missing = Required argument --name is missing. args.value_missing = Required argument --value is missing. @@ -79,7 +80,7 @@ create.job_successful = New job was successfully created with validation \ status {0} and persistent id {1} ## Creating messages create.creating_conn = Creating connection for connector with id {0} -create.creating_job = Creating job for connection with id {0} +create.creating_job = Creating job for connections with id {0} and {1} # # Delete command @@ -193,8 +194,9 @@ table.header.id = Id table.header.name = Name table.header.version = Version table.header.class = Class -table.header.type = Type table.header.connector = Connector +table.header.connector.from = From Connector +table.header.connector.to = To Connector table.header.jid = Job Id table.header.eid = External Id table.header.status = Status @@ -205,7 +207,6 @@ table.header.enabled = Enabled formdisplayer.supported_job_types = Supported job types formdisplayer.connection = Connection formdisplayer.job = Job -formdisplayer.forms_jobtype = Forms for job type formdisplayer.form = form formdisplayer.name = Name formdisplayer.label = Label http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java ---------------------------------------------------------------------- diff --git a/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java b/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java index 50eb940..7081b4c 100644 --- a/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java +++ b/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java @@ -20,11 +20,11 @@ package org.apache.sqoop.connector.spi; import java.util.Locale; import java.util.ResourceBundle; +import org.apache.sqoop.common.ConnectorType; import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; import org.apache.sqoop.connector.idf.IntermediateDataFormat; -import org.apache.sqoop.job.etl.Exporter; -import org.apache.sqoop.job.etl.Importer; -import org.apache.sqoop.model.MJob; +import org.apache.sqoop.job.etl.From; +import org.apache.sqoop.job.etl.To; import org.apache.sqoop.validation.Validator; /** @@ -53,17 +53,17 @@ public abstract class SqoopConnector { /** * @return Get job configuration class for given type or null if not supported */ - public abstract Class getJobConfigurationClass(MJob.Type jobType); + public abstract Class getJobConfigurationClass(ConnectorType jobType); /** - * @return an <tt>Importer</tt> that provides classes for performing import. + * @return an <tt>From</tt> that provides classes for performing import. */ - public abstract Importer getImporter(); + public abstract From getFrom(); /** - * @return an <tt>Exporter</tt> that provides classes for performing export. + * @return an <tt>To</tt> that provides classes for performing export. */ - public abstract Exporter getExporter(); + public abstract To getTo(); /** * Returns validation object that Sqoop framework can use to validate user http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/spi/src/main/java/org/apache/sqoop/job/etl/Exporter.java ---------------------------------------------------------------------- diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Exporter.java b/spi/src/main/java/org/apache/sqoop/job/etl/Exporter.java deleted file mode 100644 index cdaa623..0000000 --- a/spi/src/main/java/org/apache/sqoop/job/etl/Exporter.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.job.etl; - -/** - * This specifies classes that perform connector-defined steps - * within export execution: - * Initializer - * -> (framework-defined steps) - * -> Loader - * -> Destroyer - */ -public class Exporter extends CallbackBase { - - private Class<? extends Loader> loader; - - public Exporter( - Class<? extends Initializer> initializer, - Class<? extends Loader> loader, - Class<? extends Destroyer> destroyer - ) { - super(initializer, destroyer); - this.loader = loader; - } - - public Class<? extends Loader> getLoader() { - return loader; - } - - @Override - public String toString() { - return "Exporter{" + super.toString() + - ", loader=" + loader + - '}'; - } -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/spi/src/main/java/org/apache/sqoop/job/etl/From.java ---------------------------------------------------------------------- diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/From.java b/spi/src/main/java/org/apache/sqoop/job/etl/From.java new file mode 100644 index 0000000..9b8d76f --- /dev/null +++ b/spi/src/main/java/org/apache/sqoop/job/etl/From.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.job.etl; + +/** + * This specifies classes that perform connector-defined steps + * within import execution: + * Initializer + * -> Partitioner + * -> Extractor + * -> (framework-defined steps) + * -> Destroyer + */ +public class From extends CallbackBase { + + private Class<? extends Partitioner> partitioner; + private Class<? extends Extractor> extractor; + + public From(Class<? extends Initializer> initializer, + Class<? extends Partitioner> partitioner, + Class<? extends Extractor> extractor, + Class<? extends Destroyer> destroyer) { + super(initializer, destroyer); + this.partitioner = partitioner; + this.extractor = extractor; + } + + public Class<? extends Partitioner> getPartitioner() { + return partitioner; + } + + public Class<? extends Extractor> getExtractor() { + return extractor; + } + + @Override + public String toString() { + return "Importer{" + super.toString() + + ", partitioner=" + partitioner.getName() + + ", extractor=" + extractor.getName() + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/spi/src/main/java/org/apache/sqoop/job/etl/Importer.java ---------------------------------------------------------------------- diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Importer.java b/spi/src/main/java/org/apache/sqoop/job/etl/Importer.java deleted file mode 100644 index d4c9e70..0000000 --- a/spi/src/main/java/org/apache/sqoop/job/etl/Importer.java +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.job.etl; - -/** - * This specifies classes that perform connector-defined steps - * within import execution: - * Initializer - * -> Partitioner - * -> Extractor - * -> (framework-defined steps) - * -> Destroyer - */ -public class Importer extends CallbackBase { - - private Class<? extends Partitioner> partitioner; - private Class<? extends Extractor> extractor; - - public Importer(Class<? extends Initializer> initializer, - Class<? extends Partitioner> partitioner, - Class<? extends Extractor> extractor, - Class<? extends Destroyer> destroyer) { - super(initializer, destroyer); - this.partitioner = partitioner; - this.extractor = extractor; - } - - public Class<? extends Partitioner> getPartitioner() { - return partitioner; - } - - public Class<? extends Extractor> getExtractor() { - return extractor; - } - - @Override - public String toString() { - return "Importer{" + super.toString() + - ", partitioner=" + partitioner.getName() + - ", extractor=" + extractor.getName() + - '}'; - } -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/spi/src/main/java/org/apache/sqoop/job/etl/To.java ---------------------------------------------------------------------- diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/To.java b/spi/src/main/java/org/apache/sqoop/job/etl/To.java new file mode 100644 index 0000000..a791945 --- /dev/null +++ b/spi/src/main/java/org/apache/sqoop/job/etl/To.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.job.etl; + +/** + * This specifies classes that perform connector-defined steps + * within export execution: + * Initializer + * -> (framework-defined steps) + * -> Loader + * -> Destroyer + */ +public class To extends CallbackBase { + + private Class<? extends Loader> loader; + + public To( + Class<? extends Initializer> initializer, + Class<? extends Loader> loader, + Class<? extends Destroyer> destroyer + ) { + super(initializer, destroyer); + this.loader = loader; + } + + public Class<? extends Loader> getLoader() { + return loader; + } + + @Override + public String toString() { + return "Exporter{" + super.toString() + + ", loader=" + loader + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/spi/src/main/java/org/apache/sqoop/validation/Validator.java ---------------------------------------------------------------------- diff --git a/spi/src/main/java/org/apache/sqoop/validation/Validator.java b/spi/src/main/java/org/apache/sqoop/validation/Validator.java index cf0b4aa..9b791f8 100644 --- a/spi/src/main/java/org/apache/sqoop/validation/Validator.java +++ b/spi/src/main/java/org/apache/sqoop/validation/Validator.java @@ -40,11 +40,10 @@ public class Validator { /** * Validate configuration object for job . * - * @param type Type of jobs that being validated * @param jobConfiguration Job to be validated * @return Validation status */ - public Validation validateJob(MJob.Type type, Object jobConfiguration) { + public Validation validateJob(Object jobConfiguration) { return new Validation(EmptyClass.class); } http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java ---------------------------------------------------------------------- diff --git a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java index a05274a..3c21421 100644 --- a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java +++ b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java @@ -19,15 +19,14 @@ package org.apache.sqoop.submission.mapreduce; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobID; import org.apache.hadoop.mapred.JobStatus; import org.apache.hadoop.mapred.RunningJob; import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.log4j.Logger; +import org.apache.sqoop.common.ConnectorType; import org.apache.sqoop.common.MapContext; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.execution.mapreduce.MRSubmissionRequest; @@ -155,9 +154,6 @@ public class MapreduceSubmissionEngine extends SubmissionEngine { // Clone global configuration Configuration configuration = new Configuration(globalConfiguration); - // Serialize job type as it will be needed by underlying execution engine - ConfigurationUtils.setJobType(configuration, request.getJobType()); - // Serialize framework context into job configuration for(Map.Entry<String, String> entry: request.getFrameworkContext()) { if (entry.getValue() == null) { @@ -168,16 +164,26 @@ public class MapreduceSubmissionEngine extends SubmissionEngine { } // Serialize connector context as a sub namespace - for(Map.Entry<String, String> entry :request.getConnectorContext()) { + for(Map.Entry<String, String> entry : request.getConnectorContext(ConnectorType.FROM)) { if (entry.getValue() == null) { LOG.warn("Ignoring null connector context value for key " + entry.getKey()); continue; } configuration.set( - JobConstants.PREFIX_CONNECTOR_CONTEXT + entry.getKey(), + JobConstants.PREFIX_CONNECTOR_FROM_CONTEXT + entry.getKey(), entry.getValue()); } + for(Map.Entry<String, String> entry : request.getConnectorContext(ConnectorType.TO)) { + if (entry.getValue() == null) { + LOG.warn("Ignoring null connector context value for key " + entry.getKey()); + continue; + } + configuration.set( + JobConstants.PREFIX_CONNECTOR_TO_CONTEXT + entry.getKey(), + entry.getValue()); + } + // Set up notification URL if it's available if(request.getNotificationUrl() != null) { configuration.set("job.end.notification.url", request.getNotificationUrl()); @@ -194,9 +200,12 @@ public class MapreduceSubmissionEngine extends SubmissionEngine { Job job = new Job(configuration); // And finally put all configuration objects to credentials cache - ConfigurationUtils.setConfigConnectorConnection(job, request.getConfigConnectorConnection()); - ConfigurationUtils.setConfigConnectorJob(job, request.getConfigConnectorJob()); - ConfigurationUtils.setConfigFrameworkConnection(job, request.getConfigFrameworkConnection()); + ConfigurationUtils.setConnectorConnectionConfig(ConnectorType.FROM, job, request.getConnectorConnectionConfig(ConnectorType.FROM)); + ConfigurationUtils.setConnectorJobConfig(ConnectorType.FROM, job, request.getConnectorJobConfig(ConnectorType.FROM)); + ConfigurationUtils.setConnectorConnectionConfig(ConnectorType.TO, job, request.getConnectorConnectionConfig(ConnectorType.TO)); + ConfigurationUtils.setConnectorJobConfig(ConnectorType.TO, job, request.getConnectorJobConfig(ConnectorType.TO)); + ConfigurationUtils.setFrameworkConnectionConfig(ConnectorType.FROM, job, request.getFrameworkConnectionConfig(ConnectorType.FROM)); + ConfigurationUtils.setFrameworkConnectionConfig(ConnectorType.TO, job, request.getFrameworkConnectionConfig(ConnectorType.TO)); ConfigurationUtils.setConfigFrameworkJob(job, request.getConfigFrameworkJob()); ConfigurationUtils.setConnectorSchema(job, request.getSummary().getConnectorSchema()); @@ -212,11 +221,6 @@ public class MapreduceSubmissionEngine extends SubmissionEngine { job.setMapOutputKeyClass(request.getMapOutputKeyClass()); job.setMapOutputValueClass(request.getMapOutputValueClass()); - String outputDirectory = request.getOutputDirectory(); - if(outputDirectory != null) { - FileOutputFormat.setOutputPath(job, new Path(outputDirectory)); - } - // Set number of reducers as number of configured loaders or suppress // reduce phase entirely if loaders are not set at all. if(request.getLoaders() != null) {
