http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java index b05954b..ef7ff4e 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java @@ -17,21 +17,20 @@ */ package org.apache.sqoop.execution.mapreduce; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.io.NullWritable; import org.apache.sqoop.common.MutableMapContext; import org.apache.sqoop.framework.ExecutionEngine; -import org.apache.sqoop.framework.SubmissionRequest; -import org.apache.sqoop.framework.configuration.JobConfiguration; +import org.apache.sqoop.framework.JobRequest; import org.apache.sqoop.job.JobConstants; import org.apache.sqoop.job.etl.From; import org.apache.sqoop.job.etl.To; -import org.apache.sqoop.job.io.Data; import org.apache.sqoop.job.io.SqoopWritable; import org.apache.sqoop.job.mr.SqoopInputFormat; import org.apache.sqoop.job.mr.SqoopMapper; import org.apache.sqoop.job.mr.SqoopNullOutputFormat; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + /** * */ @@ -41,44 +40,40 @@ public class MapreduceExecutionEngine extends ExecutionEngine { * {@inheritDoc} */ @Override - public SubmissionRequest createSubmissionRequest() { - return new MRSubmissionRequest(); + public JobRequest createJobRequest() { + return new MRJobRequest(); } - public void prepareSubmission(SubmissionRequest gRequest) { - MRSubmissionRequest request = (MRSubmissionRequest)gRequest; + public void prepareJob(JobRequest jobRequest) { + MRJobRequest mrJobRequest = (MRJobRequest)jobRequest; // Add jar dependencies - addDependencies(request); + addDependencies(mrJobRequest); // Configure map-reduce classes for import - request.setInputFormatClass(SqoopInputFormat.class); + mrJobRequest.setInputFormatClass(SqoopInputFormat.class); - request.setMapperClass(SqoopMapper.class); - request.setMapOutputKeyClass(SqoopWritable.class); - request.setMapOutputValueClass(NullWritable.class); + mrJobRequest.setMapperClass(SqoopMapper.class); + mrJobRequest.setMapOutputKeyClass(SqoopWritable.class); + mrJobRequest.setMapOutputValueClass(NullWritable.class); - request.setOutputFormatClass(SqoopNullOutputFormat.class); - request.setOutputKeyClass(SqoopWritable.class); - request.setOutputValueClass(NullWritable.class); + mrJobRequest.setOutputFormatClass(SqoopNullOutputFormat.class); + mrJobRequest.setOutputKeyClass(SqoopWritable.class); + mrJobRequest.setOutputValueClass(NullWritable.class); - // Set up framework context - From from = (From)request.getFromCallback(); - To to = (To)request.getToCallback(); - MutableMapContext context = request.getFrameworkContext(); + From from = (From) mrJobRequest.getFrom(); + To to = (To) mrJobRequest.getTo(); + + MutableMapContext context = mrJobRequest.getFrameworkContext(); context.setString(JobConstants.JOB_ETL_PARTITIONER, from.getPartitioner().getName()); context.setString(JobConstants.JOB_ETL_EXTRACTOR, from.getExtractor().getName()); context.setString(JobConstants.JOB_ETL_LOADER, to.getLoader().getName()); context.setString(JobConstants.JOB_ETL_DESTROYER, from.getDestroyer().getName()); context.setString(JobConstants.INTERMEDIATE_DATA_FORMAT, - request.getIntermediateDataFormat().getName()); - - if(request.getExtractors() != null) { - context.setInteger(JobConstants.JOB_ETL_EXTRACTOR_NUM, request.getExtractors()); - } + mrJobRequest.getIntermediateDataFormat().getName()); - if(request.getExtractors() != null) { - context.setInteger(JobConstants.JOB_ETL_EXTRACTOR_NUM, request.getExtractors()); + if(mrJobRequest.getExtractors() != null) { + context.setInteger(JobConstants.JOB_ETL_EXTRACTOR_NUM, mrJobRequest.getExtractors()); } } @@ -91,7 +86,7 @@ public class MapreduceExecutionEngine extends ExecutionEngine { * * @param request Active request object. */ - protected void addDependencies(MRSubmissionRequest request) { + protected void addDependencies(MRJobRequest request) { // Guava request.addJarForClass(ThreadFactoryBuilder.class); }
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java index 92414d8..2ed06a8 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java @@ -90,9 +90,6 @@ public final class ConfigurationUtils { private static final Text SCHEMA_TO_CONNECTOR_KEY = new Text(SCHEMA_TO_CONNECTOR); - private static final String SCHEMA_HIO = JobConstants.PREFIX_JOB_CONFIG + "schema.hio"; - - private static final Text SCHEMA_HIO_KEY = new Text(SCHEMA_HIO); /** * Persist Connector configuration object for connection. http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ProgressRunnable.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ProgressRunnable.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ProgressRunnable.java index b73b151..4c2e206 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ProgressRunnable.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ProgressRunnable.java @@ -31,9 +31,9 @@ public class ProgressRunnable implements Runnable { /** * Context class that we should use for reporting progress. */ - private final TaskInputOutputContext context; + private final TaskInputOutputContext<?,?,?,?> context; - public ProgressRunnable(final TaskInputOutputContext ctxt) { + public ProgressRunnable(final TaskInputOutputContext<?,?,?,?> ctxt) { this.context = ctxt; } http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java index 59431f4..e3af6e1 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java @@ -32,8 +32,7 @@ import org.apache.sqoop.utils.ClassUtils; */ public class SqoopDestroyerExecutor { - public static final Logger LOG = - Logger.getLogger(SqoopDestroyerExecutor.class); + public static final Logger LOG = Logger.getLogger(SqoopDestroyerExecutor.class); /** * Execute destroyer. @@ -56,10 +55,8 @@ public class SqoopDestroyerExecutor { Object fromConfigConnection = ConfigurationUtils.getConnectorConnectionConfig(Direction.FROM, configuration); Object fromConfigJob = ConfigurationUtils.getConnectorJobConfig(Direction.FROM, configuration); - // Propagate connector schema in every case for now - // TODO: Change to coditional choosing between Connector schemas. + // TODO(Abe/Gwen): Change to conditional choosing between schemas. Schema schema = ConfigurationUtils.getConnectorSchema(Direction.FROM, configuration); - DestroyerContext destroyerContext = new DestroyerContext(subContext, success, schema); LOG.info("Executing destroyer class " + destroyer.getClass()); http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java index bbf7342..3e2b1c5 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java @@ -64,9 +64,7 @@ public class SqoopFileOutputFormat conf.set(JobConstants.JOB_MR_OUTPUT_CODEC, codecname); } - SqoopOutputFormatLoadExecutor executor = - new SqoopOutputFormatLoadExecutor(context); - return executor.getRecordWriter(); + return new SqoopOutputFormatLoadExecutor(context).getRecordWriter(); } @Override http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java index 3065680..6680f60 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java @@ -54,7 +54,7 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable, * Service for reporting progress to mapreduce. */ private final ScheduledExecutorService progressService = Executors.newSingleThreadScheduledExecutor(); - private IntermediateDataFormat data = null; + private IntermediateDataFormat<String> dataFormat = null; private SqoopWritable dataOut = null; @Override @@ -64,44 +64,36 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable, String extractorName = conf.get(JobConstants.JOB_ETL_EXTRACTOR); Extractor extractor = (Extractor) ClassUtils.instantiate(extractorName); - // Propagate connector schema in every case for now - // TODO: Change to coditional choosing between Connector schemas. - + // TODO(Abe/Gwen): Change to conditional choosing between Connector schemas. Schema schema = ConfigurationUtils.getConnectorSchema(Direction.FROM, conf); - if (schema==null) { + if (schema == null) { schema = ConfigurationUtils.getConnectorSchema(Direction.TO, conf); } - if (schema==null) { + if (schema == null) { LOG.info("setting an empty schema"); } - - String intermediateDataFormatName = conf.get(JobConstants - .INTERMEDIATE_DATA_FORMAT); - data = (IntermediateDataFormat) ClassUtils.instantiate(intermediateDataFormatName); - data.setSchema(schema); + String intermediateDataFormatName = conf.get(JobConstants.INTERMEDIATE_DATA_FORMAT); + dataFormat = (IntermediateDataFormat<String>) ClassUtils + .instantiate(intermediateDataFormatName); + dataFormat.setSchema(schema); dataOut = new SqoopWritable(); - // Objects that should be pass to the Executor execution - PrefixContext subContext = null; - Object configConnection = null; - Object configJob = null; - - // Get configs for extractor - subContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_FROM_CONTEXT); - configConnection = ConfigurationUtils.getConnectorConnectionConfig(Direction.FROM, conf); - configJob = ConfigurationUtils.getConnectorJobConfig(Direction.FROM, conf); + // Objects that should be passed to the Executor execution + PrefixContext subContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_FROM_CONTEXT); + Object fromConfig = ConfigurationUtils.getConnectorConnectionConfig(Direction.FROM, conf); + Object fromJob = ConfigurationUtils.getConnectorJobConfig(Direction.FROM, conf); SqoopSplit split = context.getCurrentKey(); - ExtractorContext extractorContext = new ExtractorContext(subContext, new MapDataWriter(context), schema); + ExtractorContext extractorContext = new ExtractorContext(subContext, new SqoopMapDataWriter(context), schema); try { LOG.info("Starting progress service"); progressService.scheduleAtFixedRate(new ProgressRunnable(context), 0, 2, TimeUnit.MINUTES); LOG.info("Running extractor class " + extractorName); - extractor.extract(extractorContext, configConnection, configJob, split.getPartition()); + extractor.extract(extractorContext, fromConfig, fromJob, split.getPartition()); LOG.info("Extractor has finished"); context.getCounter(SqoopCounters.ROWS_READ) .increment(extractor.getRowsRead()); @@ -117,37 +109,37 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable, } } - private class MapDataWriter extends DataWriter { + private class SqoopMapDataWriter extends DataWriter { private Context context; - public MapDataWriter(Context context) { + public SqoopMapDataWriter(Context context) { this.context = context; } @Override public void writeArrayRecord(Object[] array) { - data.setObjectData(array); + dataFormat.setObjectData(array); writeContent(); } @Override public void writeStringRecord(String text) { - data.setTextData(text); + dataFormat.setTextData(text); writeContent(); } @Override public void writeRecord(Object obj) { - data.setData(obj.toString()); + dataFormat.setData(obj.toString()); writeContent(); } private void writeContent() { try { if (LOG.isDebugEnabled()) { - LOG.debug("Extracted data: " + data.getTextData()); + LOG.debug("Extracted data: " + dataFormat.getTextData()); } - dataOut.setString(data.getTextData()); + dataOut.setString(dataFormat.getTextData()); context.write(dataOut, NullWritable.get()); } catch (Exception e) { throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0013, e); http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java index e457cff..2996275 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java @@ -51,9 +51,9 @@ public class SqoopOutputFormatLoadExecutor { private volatile boolean readerFinished = false; private volatile boolean writerFinished = false; - private volatile IntermediateDataFormat data; + private volatile IntermediateDataFormat<String> dataFormat; private JobContext context; - private SqoopRecordWriter producer; + private SqoopRecordWriter writer; private Future<?> consumerFuture; private Semaphore filled = new Semaphore(0, true); private Semaphore free = new Semaphore(1, true); @@ -63,14 +63,14 @@ public class SqoopOutputFormatLoadExecutor { SqoopOutputFormatLoadExecutor(boolean isTest, String loaderName){ this.isTest = isTest; this.loaderName = loaderName; - data = new CSVIntermediateDataFormat(); - producer = new SqoopRecordWriter(); + dataFormat = new CSVIntermediateDataFormat(); + writer = new SqoopRecordWriter(); } public SqoopOutputFormatLoadExecutor(JobContext jobctx) { context = jobctx; - producer = new SqoopRecordWriter(); - data = (IntermediateDataFormat) ClassUtils.instantiate(context + writer = new SqoopRecordWriter(); + dataFormat = (IntermediateDataFormat<String>) ClassUtils.instantiate(context .getConfiguration().get(JobConstants.INTERMEDIATE_DATA_FORMAT)); Schema schema = ConfigurationUtils.getConnectorSchema(Direction.FROM, context.getConfiguration()); @@ -78,14 +78,14 @@ public class SqoopOutputFormatLoadExecutor { schema = ConfigurationUtils.getConnectorSchema(Direction.TO, context.getConfiguration()); } - data.setSchema(schema); + dataFormat.setSchema(schema); } public RecordWriter<SqoopWritable, NullWritable> getRecordWriter() { consumerFuture = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat ("OutputFormatLoader-consumer").build()).submit( new ConsumerThread()); - return producer; + return writer; } /* @@ -98,7 +98,7 @@ public class SqoopOutputFormatLoadExecutor { public void write(SqoopWritable key, NullWritable value) throws InterruptedException { free.acquire(); checkIfConsumerThrew(); - data.setTextData(key.getString()); + dataFormat.setTextData(key.getString()); filled.release(); } @@ -144,7 +144,7 @@ public class SqoopOutputFormatLoadExecutor { } } - private class OutputFormatDataReader extends DataReader { + private class SqoopOutputFormatDataReader extends DataReader { @Override public Object[] readArrayRecord() throws InterruptedException { @@ -154,7 +154,7 @@ public class SqoopOutputFormatLoadExecutor { return null; } try { - return data.getObjectData(); + return dataFormat.getObjectData(); } finally { releaseSema(); } @@ -168,7 +168,7 @@ public class SqoopOutputFormatLoadExecutor { return null; } try { - return data.getTextData(); + return dataFormat.getTextData(); } finally { releaseSema(); } @@ -181,7 +181,7 @@ public class SqoopOutputFormatLoadExecutor { return null; } try { - return data.getData(); + return dataFormat.getData(); } catch (Throwable t) { readerFinished = true; LOG.error("Caught exception e while getting content ", t); @@ -215,7 +215,7 @@ public class SqoopOutputFormatLoadExecutor { public void run() { LOG.info("SqoopOutputFormatLoadExecutor consumer thread is starting"); try { - DataReader reader = new OutputFormatDataReader(); + DataReader reader = new SqoopOutputFormatDataReader(); Configuration conf = null; if (!isTest) { http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/shell/src/main/java/org/apache/sqoop/shell/utils/SubmissionDisplayer.java ---------------------------------------------------------------------- diff --git a/shell/src/main/java/org/apache/sqoop/shell/utils/SubmissionDisplayer.java b/shell/src/main/java/org/apache/sqoop/shell/utils/SubmissionDisplayer.java index 6dbd870..51f778b 100644 --- a/shell/src/main/java/org/apache/sqoop/shell/utils/SubmissionDisplayer.java +++ b/shell/src/main/java/org/apache/sqoop/shell/utils/SubmissionDisplayer.java @@ -68,14 +68,14 @@ public final class SubmissionDisplayer { } } - if(isVerbose() && submission.getConnectorSchema() != null) { + if(isVerbose() && submission.getFromSchema() != null) { print(resourceString(Constants.RES_CONNECTOR_SCHEMA)+": "); - println(submission.getConnectorSchema()); + println(submission.getFromSchema()); } - if(isVerbose() && submission.getHioSchema() != null) { + if(isVerbose() && submission.getToSchema() != null) { print(resourceString(Constants.RES_HIO_SCHEMA)+": "); - println(submission.getHioSchema()); + println(submission.getToSchema()); } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/spi/src/main/java/org/apache/sqoop/job/etl/CallbackBase.java ---------------------------------------------------------------------- diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/CallbackBase.java b/spi/src/main/java/org/apache/sqoop/job/etl/CallbackBase.java deleted file mode 100644 index 59a9457..0000000 --- a/spi/src/main/java/org/apache/sqoop/job/etl/CallbackBase.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.job.etl; - -/** - * Set of default callbacks that must be implement by each job type. - */ -public abstract class CallbackBase { - - private Class<? extends Initializer> initializer; - private Class<? extends Destroyer> destroyer; - - public CallbackBase( - Class<? extends Initializer> initializer, - Class<? extends Destroyer> destroyer - ) { - this.initializer = initializer; - this.destroyer = destroyer; - } - - public Class<? extends Destroyer> getDestroyer() { - return destroyer; - } - - public Class<? extends Initializer> getInitializer() { - return initializer; - } - - @Override - public String toString() { - return "initializer=" + initializer.getName() + - ", destroyer=" + destroyer.getName(); - } -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/spi/src/main/java/org/apache/sqoop/job/etl/From.java ---------------------------------------------------------------------- diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/From.java b/spi/src/main/java/org/apache/sqoop/job/etl/From.java index 9b8d76f..80f4f29 100644 --- a/spi/src/main/java/org/apache/sqoop/job/etl/From.java +++ b/spi/src/main/java/org/apache/sqoop/job/etl/From.java @@ -26,7 +26,7 @@ package org.apache.sqoop.job.etl; * -> (framework-defined steps) * -> Destroyer */ -public class From extends CallbackBase { +public class From extends Transferable { private Class<? extends Partitioner> partitioner; private Class<? extends Extractor> extractor; http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/spi/src/main/java/org/apache/sqoop/job/etl/To.java ---------------------------------------------------------------------- diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/To.java b/spi/src/main/java/org/apache/sqoop/job/etl/To.java index a791945..b8717ae 100644 --- a/spi/src/main/java/org/apache/sqoop/job/etl/To.java +++ b/spi/src/main/java/org/apache/sqoop/job/etl/To.java @@ -25,7 +25,7 @@ package org.apache.sqoop.job.etl; * -> Loader * -> Destroyer */ -public class To extends CallbackBase { +public class To extends Transferable { private Class<? extends Loader> loader; http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/spi/src/main/java/org/apache/sqoop/job/etl/Transferable.java ---------------------------------------------------------------------- diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Transferable.java b/spi/src/main/java/org/apache/sqoop/job/etl/Transferable.java new file mode 100644 index 0000000..dfe1d5e --- /dev/null +++ b/spi/src/main/java/org/apache/sqoop/job/etl/Transferable.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.job.etl; + +/** + * This entity encapsulates the workflow for data transfer via the + * {@link SqoopConnector}.It basically acts as an adapter between the data-source + * imported from or exported to. + */ +public abstract class Transferable { + + private Class<? extends Initializer> initializer; + private Class<? extends Destroyer> destroyer; + + public Transferable( + Class<? extends Initializer> initializer, + Class<? extends Destroyer> destroyer + ) { + this.initializer = initializer; + this.destroyer = destroyer; + } + + public Class<? extends Destroyer> getDestroyer() { + return destroyer; + } + + public Class<? extends Initializer> getInitializer() { + return initializer; + } + + @Override + public String toString() { + return "initializer=" + initializer.getName() + + ", destroyer=" + destroyer.getName(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/spi/src/main/java/org/apache/sqoop/validation/Validator.java ---------------------------------------------------------------------- diff --git a/spi/src/main/java/org/apache/sqoop/validation/Validator.java b/spi/src/main/java/org/apache/sqoop/validation/Validator.java index 9b791f8..f31adb5 100644 --- a/spi/src/main/java/org/apache/sqoop/validation/Validator.java +++ b/spi/src/main/java/org/apache/sqoop/validation/Validator.java @@ -17,7 +17,6 @@ */ package org.apache.sqoop.validation; -import org.apache.sqoop.model.MJob; /** * Connection and job metadata validator. http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java ---------------------------------------------------------------------- diff --git a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java index bfa6958..93741e6 100644 --- a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java +++ b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java @@ -29,9 +29,9 @@ import org.apache.log4j.Logger; import org.apache.sqoop.common.Direction; import org.apache.sqoop.common.MapContext; import org.apache.sqoop.common.SqoopException; -import org.apache.sqoop.execution.mapreduce.MRSubmissionRequest; +import org.apache.sqoop.execution.mapreduce.MRJobRequest; import org.apache.sqoop.execution.mapreduce.MapreduceExecutionEngine; -import org.apache.sqoop.framework.SubmissionRequest; +import org.apache.sqoop.framework.JobRequest; import org.apache.sqoop.framework.SubmissionEngine; import org.apache.sqoop.job.JobConstants; import org.apache.sqoop.job.mr.ConfigurationUtils; @@ -72,6 +72,7 @@ public class MapreduceSubmissionEngine extends SubmissionEngine { */ @Override public void initialize(MapContext context, String prefix) { + super.initialize(context, prefix); LOG.info("Initializing Map-reduce Submission Engine"); // Build global configuration, start with empty configuration object @@ -125,6 +126,7 @@ public class MapreduceSubmissionEngine extends SubmissionEngine { */ @Override public void destroy() { + super.destroy(); LOG.info("Destroying Mapreduce Submission Engine"); // Closing job client @@ -147,9 +149,9 @@ public class MapreduceSubmissionEngine extends SubmissionEngine { * {@inheritDoc} */ @Override - public boolean submit(SubmissionRequest generalRequest) { + public boolean submit(JobRequest mrJobRequest) { // We're supporting only map reduce jobs - MRSubmissionRequest request = (MRSubmissionRequest) generalRequest; + MRJobRequest request = (MRJobRequest) mrJobRequest; // Clone global configuration Configuration configuration = new Configuration(globalConfiguration); @@ -208,7 +210,7 @@ public class MapreduceSubmissionEngine extends SubmissionEngine { ConfigurationUtils.setFrameworkConnectionConfig(Direction.TO, job, request.getFrameworkConnectionConfig(Direction.TO)); ConfigurationUtils.setFrameworkJobConfig(job, request.getConfigFrameworkJob()); // @TODO(Abe): Persist TO schema. - ConfigurationUtils.setConnectorSchema(Direction.FROM, job, request.getSummary().getConnectorSchema()); + ConfigurationUtils.setConnectorSchema(Direction.FROM, job, request.getSummary().getFromSchema()); if(request.getJobName() != null) { job.setJobName("Sqoop: " + request.getJobName());
