SQOOP-677 Destroyer needs to be called from OutputCommitter (Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/ef12bf50 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/ef12bf50 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/ef12bf50 Branch: refs/heads/sqoop2 Commit: ef12bf508faf524f64ed52fb048e92b8a5b34398 Parents: 13c5c06 Author: Bilung Lee <[email protected]> Authored: Mon Nov 26 08:45:02 2012 -0800 Committer: Bilung Lee <[email protected]> Committed: Mon Nov 26 08:45:02 2012 -0800 ---------------------------------------------------------------------- .../connector/jdbc/GenericJdbcExportDestroyer.java | 8 +- .../connector/jdbc/GenericJdbcImportDestroyer.java | 4 +- .../apache/sqoop/framework/FrameworkManager.java | 3 +- .../sqoop/job/mr/SqoopDestroyerExecutor.java | 64 +++++++++++++++ .../apache/sqoop/job/mr/SqoopFileOutputFormat.java | 31 +++++++ .../apache/sqoop/job/mr/SqoopNullOutputFormat.java | 26 +++++- .../java/org/apache/sqoop/job/etl/Destroyer.java | 13 +++- 7 files changed, 139 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/ef12bf50/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportDestroyer.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportDestroyer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportDestroyer.java index 7f952ac..37149de 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportDestroyer.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportDestroyer.java @@ -17,14 +17,16 @@ */ package org.apache.sqoop.connector.jdbc; +import org.apache.log4j.Logger; import org.apache.sqoop.common.ImmutableContext; import org.apache.sqoop.job.etl.Destroyer; public class GenericJdbcExportDestroyer extends Destroyer { + private static final Logger LOG = Logger.getLogger(GenericJdbcExportDestroyer.class); + @Override - public void run(ImmutableContext context) { - // TODO Auto-generated method stub + public void destroy(boolean success, ImmutableContext context, Object connectionConfig, Object jobConfig) { + LOG.info("Running generic JDBC connector destroyer"); } - } http://git-wip-us.apache.org/repos/asf/sqoop/blob/ef12bf50/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportDestroyer.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportDestroyer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportDestroyer.java index a53fa59..e09b0c3 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportDestroyer.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportDestroyer.java @@ -23,8 +23,8 @@ import org.apache.sqoop.job.etl.Destroyer; public class GenericJdbcImportDestroyer extends Destroyer { @Override - public void run(ImmutableContext context) { - // TODO Auto-generated method stub + public void destroy(boolean success, ImmutableContext context, Object connectionConfig, Object jobConfig) { + // No explicit action at the moment } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/ef12bf50/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java index 79c9acc..b012d23 100644 --- a/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java +++ b/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java @@ -433,7 +433,8 @@ public final class FrameworkManager { } // Initialize submission from connector perspective - destroyer.run(request.getConnectorContext()); + destroyer.destroy(false, request.getConnectorContext(), + request.getConfigConnectorConnection(), request.getConfigConnectorJob()); } public static MSubmission stop(long jobId) { http://git-wip-us.apache.org/repos/asf/sqoop/blob/ef12bf50/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java new file mode 100644 index 0000000..36eb65d --- /dev/null +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.job.mr; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.sqoop.job.JobConstants; +import org.apache.sqoop.job.PrefixContext; +import org.apache.sqoop.job.etl.Destroyer; +import org.apache.sqoop.utils.ClassUtils; + +/** + * Helper class to execute destroyers on mapreduce job side. + */ +public class SqoopDestroyerExecutor { + + public static final Log LOG = + LogFactory.getLog(SqoopNullOutputFormat.class.getName()); + + /** + * Execute destroyer. + * + * @param success True if the job execution was successfull + * @param configuration Configuration object to get destroyer class with context + * and configuration objects. + * @param propertyName Name of property that holds destroyer class. + */ + public static void executeDestroyer(boolean success, Configuration configuration, String propertyName) { + Destroyer destroyer = (Destroyer) ClassUtils.instantiate(configuration.get(propertyName)); + + if(destroyer == null) { + LOG.info("Skipping running destroyer as non was defined."); + return; + } + + // Objects that should be pass to the Destroyer execution + PrefixContext subContext = new PrefixContext(configuration, JobConstants.PREFIX_CONNECTOR_CONTEXT); + Object configConnection = ConfigurationUtils.getConnectorConnection(configuration); + Object configJob = ConfigurationUtils.getConnectorJob(configuration); + + destroyer.destroy(success, subContext, configConnection, configJob); + } + + private SqoopDestroyerExecutor() { + // Instantiation is prohibited + } + +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/ef12bf50/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java index c221cbf..813f370 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java @@ -27,8 +27,12 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.DefaultCodec; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.sqoop.job.JobConstants; import org.apache.sqoop.job.io.Data; @@ -66,4 +70,31 @@ public class SqoopFileOutputFormat return executor.getRecordWriter(); } + public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException { + Path output = getOutputPath(context); + return new DestroyerFileOutputCommitter(output, context); + } + + public class DestroyerFileOutputCommitter extends FileOutputCommitter { + + public DestroyerFileOutputCommitter(Path outputPath, TaskAttemptContext context) throws IOException { + super(outputPath, context); + } + + @Override + public void commitJob(JobContext context) throws IOException { + super.commitJob(context); + + Configuration config = context.getConfiguration(); + SqoopDestroyerExecutor.executeDestroyer(true, config, JobConstants.JOB_ETL_DESTROYER); + } + + @Override + public void abortJob(JobContext context, JobStatus.State state) throws IOException { + super.abortJob(context, state); + + Configuration config = context.getConfiguration(); + SqoopDestroyerExecutor.executeDestroyer(false, config, JobConstants.JOB_ETL_DESTROYER); + } + } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/ef12bf50/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java index 1242f90..54604a7 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java @@ -20,14 +20,19 @@ package org.apache.sqoop.job.mr; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobStatus; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.sqoop.job.JobConstants; import org.apache.sqoop.job.io.Data; +import java.io.IOException; + /** * An output format for MapReduce job. */ @@ -51,15 +56,30 @@ public class SqoopNullOutputFormat extends OutputFormat<Data, NullWritable> { @Override public OutputCommitter getOutputCommitter(TaskAttemptContext context) { - // return an output committer that does nothing - return new NullOutputCommitter(); + return new DestroyerOutputCommitter(); } - class NullOutputCommitter extends OutputCommitter { + class DestroyerOutputCommitter extends OutputCommitter { @Override public void setupJob(JobContext jobContext) { } @Override + public void commitJob(JobContext jobContext) throws IOException { + super.commitJob(jobContext); + + Configuration config = jobContext.getConfiguration(); + SqoopDestroyerExecutor.executeDestroyer(true, config, JobConstants.JOB_ETL_DESTROYER); + } + + @Override + public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException { + super.abortJob(jobContext, state); + + Configuration config = jobContext.getConfiguration(); + SqoopDestroyerExecutor.executeDestroyer(false, config, JobConstants.JOB_ETL_DESTROYER); + } + + @Override public void setupTask(TaskAttemptContext taskContext) { } @Override http://git-wip-us.apache.org/repos/asf/sqoop/blob/ef12bf50/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java ---------------------------------------------------------------------- diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java b/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java index c8dc7c3..528d550 100644 --- a/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java +++ b/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java @@ -25,6 +25,17 @@ import org.apache.sqoop.common.ImmutableContext; */ public abstract class Destroyer { - public abstract void run(ImmutableContext context); + /** + * Callback to clean up after job execution. + * + * @param success True if the execution was successfull + * @param context Connector context object + * @param connectionConfiguration Connection configuration object + * @param jobConfiguration Job configuration object + */ + public abstract void destroy(boolean success, + ImmutableContext context, + Object connectionConfiguration, + Object jobConfiguration); }
