http://git-wip-us.apache.org/repos/asf/sqoop/blob/1cd3c373/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestSubmissionHandling.java ---------------------------------------------------------------------- diff --git a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestSubmissionHandling.java b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestSubmissionHandling.java new file mode 100644 index 0000000..3433b20 --- /dev/null +++ b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestSubmissionHandling.java @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.repository.derby; + +import org.apache.sqoop.model.MSubmission; +import org.apache.sqoop.submission.SubmissionStatus; + +import java.util.Calendar; +import java.util.Date; +import java.util.List; + +/** + * + */ +public class TestSubmissionHandling extends DerbyTestCase { + + DerbyRepositoryHandler handler; + + @Override + public void setUp() throws Exception { + super.setUp(); + + handler = new DerbyRepositoryHandler(); + + // We always needs schema for this test case + createSchema(); + + // We always needs connector and framework structures in place + loadConnectorAndFramework(); + + // We also always needs connection metadata in place + loadConnections(); + + // And finally we always needs job metadata in place + loadJobs(); + } + + public void testFindSubmissionsUnfinished() throws Exception { + List<MSubmission> submissions; + + submissions = handler.findSubmissionsUnfinished(getDerbyConnection()); + assertNotNull(submissions); + assertEquals(0, submissions.size()); + + loadSubmissions(); + + submissions = handler.findSubmissionsUnfinished(getDerbyConnection()); + assertNotNull(submissions); + assertEquals(2, submissions.size()); + } + + public void testExistsSubmission() throws Exception { + // There shouldn't be anything on empty repository + assertFalse(handler.existsSubmission(1, getDerbyConnection())); + assertFalse(handler.existsSubmission(2, getDerbyConnection())); + assertFalse(handler.existsSubmission(3, getDerbyConnection())); + assertFalse(handler.existsSubmission(4, getDerbyConnection())); + assertFalse(handler.existsSubmission(5, getDerbyConnection())); + assertFalse(handler.existsSubmission(6, getDerbyConnection())); + + loadSubmissions(); + + assertTrue(handler.existsSubmission(1, getDerbyConnection())); + assertTrue(handler.existsSubmission(2, getDerbyConnection())); + assertTrue(handler.existsSubmission(3, getDerbyConnection())); + assertTrue(handler.existsSubmission(4, getDerbyConnection())); + assertTrue(handler.existsSubmission(5, getDerbyConnection())); + assertFalse(handler.existsSubmission(6, getDerbyConnection())); + } + + public void testCreateSubmission() throws Exception { + MSubmission submission = + new MSubmission(1, new Date(), SubmissionStatus.RUNNING, "job-x"); + + handler.createSubmission(submission, getDerbyConnection()); + + assertEquals(1, submission.getPersistenceId()); + assertCountForTable("SQOOP.SQ_SUBMISSION", 1); + + List<MSubmission> submissions = + handler.findSubmissionsUnfinished(getDerbyConnection()); + assertNotNull(submissions); + assertEquals(1, submissions.size()); + + submission = submissions.get(0); + + assertEquals(1, submission.getJobId()); + assertEquals(SubmissionStatus.RUNNING, submission.getStatus()); + assertEquals("job-x", submission.getExternalId()); + + // Let's create second connection + submission = + new MSubmission(1, new Date(), SubmissionStatus.SUCCEEDED, "job-x"); + handler.createSubmission(submission, getDerbyConnection()); + + assertEquals(2, submission.getPersistenceId()); + assertCountForTable("SQOOP.SQ_SUBMISSION", 2); + } + + public void testUpdateConnection() throws Exception { + loadSubmissions(); + + List<MSubmission> submissions = + handler.findSubmissionsUnfinished(getDerbyConnection()); + assertNotNull(submissions); + assertEquals(2, submissions.size()); + + MSubmission submission = submissions.get(0); + submission.setStatus(SubmissionStatus.SUCCEEDED); + + handler.updateSubmission(submission, getDerbyConnection()); + + submissions = handler.findSubmissionsUnfinished(getDerbyConnection()); + assertNotNull(submissions); + assertEquals(1, submissions.size()); + } + + public void testPurgeSubmissions() throws Exception { + loadSubmissions(); + List<MSubmission> submissions; + + submissions = handler.findSubmissionsUnfinished(getDerbyConnection()); + assertNotNull(submissions); + assertEquals(2, submissions.size()); + assertCountForTable("SQOOP.SQ_SUBMISSION", 5); + + Calendar calendar = Calendar.getInstance(); + // 2012-01-03 05:05:05 + calendar.set(2012, Calendar.JANUARY, 3, 5, 5, 5); + handler.purgeSubmissions(calendar.getTime(), getDerbyConnection()); + + submissions = handler.findSubmissionsUnfinished(getDerbyConnection()); + assertNotNull(submissions); + assertEquals(1, submissions.size()); + assertCountForTable("SQOOP.SQ_SUBMISSION", 2); + + handler.purgeSubmissions(new Date(), getDerbyConnection()); + + submissions = handler.findSubmissionsUnfinished(getDerbyConnection()); + assertNotNull(submissions); + assertEquals(0, submissions.size()); + assertCountForTable("SQOOP.SQ_SUBMISSION", 0); + + handler.purgeSubmissions(new Date(), getDerbyConnection()); + + submissions = handler.findSubmissionsUnfinished(getDerbyConnection()); + assertNotNull(submissions); + assertEquals(0, submissions.size()); + assertCountForTable("SQOOP.SQ_SUBMISSION", 0); + } +}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/1cd3c373/server/pom.xml ---------------------------------------------------------------------- diff --git a/server/pom.xml b/server/pom.xml index 78ad8ee..71aa6c9 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -46,6 +46,12 @@ limitations under the License. </dependency> <dependency> + <groupId>org.apache.sqoop.submission</groupId> + <artifactId>sqoop-submission-mapreduce</artifactId> + <version>2.0.0-SNAPSHOT</version> + </dependency> + + <dependency> <groupId>org.apache.sqoop.repository</groupId> <artifactId>sqoop-repository-derby</artifactId> <version>2.0.0-SNAPSHOT</version> http://git-wip-us.apache.org/repos/asf/sqoop/blob/1cd3c373/server/src/main/java/org/apache/sqoop/handler/ConnectionRequestHandler.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/sqoop/handler/ConnectionRequestHandler.java b/server/src/main/java/org/apache/sqoop/handler/ConnectionRequestHandler.java index eba334e..64ef84a 100644 --- a/server/src/main/java/org/apache/sqoop/handler/ConnectionRequestHandler.java +++ b/server/src/main/java/org/apache/sqoop/handler/ConnectionRequestHandler.java @@ -33,7 +33,7 @@ import org.apache.sqoop.repository.RepositoryManager; import org.apache.sqoop.server.RequestContext; import org.apache.sqoop.server.RequestHandler; import org.apache.sqoop.server.common.ServerError; -import org.apache.sqoop.utils.ClassLoadingUtils; +import org.apache.sqoop.utils.ClassUtils; import org.apache.sqoop.validation.Status; import org.apache.sqoop.validation.Validation; import org.apache.sqoop.validation.Validator; @@ -158,9 +158,9 @@ public class ConnectionRequestHandler implements RequestHandler { Validator frameworkValidator = FrameworkManager.getValidator(); // We need translate forms to configuration objects - Object connectorConfig = ClassLoadingUtils.instantiate( + Object connectorConfig = ClassUtils.instantiate( connector.getConnectionConfigurationClass()); - Object frameworkConfig = ClassLoadingUtils.instantiate( + Object frameworkConfig = ClassUtils.instantiate( FrameworkManager.getConnectionConfigurationClass()); FormUtils.fillValues( http://git-wip-us.apache.org/repos/asf/sqoop/blob/1cd3c373/server/src/main/java/org/apache/sqoop/handler/ConnectorRequestHandler.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/sqoop/handler/ConnectorRequestHandler.java b/server/src/main/java/org/apache/sqoop/handler/ConnectorRequestHandler.java index fda91fd..8a52243 100644 --- a/server/src/main/java/org/apache/sqoop/handler/ConnectorRequestHandler.java +++ b/server/src/main/java/org/apache/sqoop/handler/ConnectorRequestHandler.java @@ -60,7 +60,7 @@ public class ConnectorRequestHandler implements RequestHandler { Long id = Long.parseLong(cid); // Check that user is not asking for non existing connector id - if(!ConnectorManager.getConnectoIds().contains(id)) { + if(!ConnectorManager.getConnectorIds().contains(id)) { throw new SqoopException(ServerError.SERVER_0004, "Invalid id " + id); } http://git-wip-us.apache.org/repos/asf/sqoop/blob/1cd3c373/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java b/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java index 0589e30..070b290 100644 --- a/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java +++ b/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java @@ -33,7 +33,7 @@ import org.apache.sqoop.repository.RepositoryManager; import org.apache.sqoop.server.RequestContext; import org.apache.sqoop.server.RequestHandler; import org.apache.sqoop.server.common.ServerError; -import org.apache.sqoop.utils.ClassLoadingUtils; +import org.apache.sqoop.utils.ClassUtils; import org.apache.sqoop.validation.Status; import org.apache.sqoop.validation.Validation; import org.apache.sqoop.validation.Validator; @@ -159,10 +159,10 @@ public class JobRequestHandler implements RequestHandler { Validator frameworkValidator = FrameworkManager.getValidator(); // We need translate forms to configuration objects - Object connectorConfig = ClassLoadingUtils.instantiate( - connector.getConnectionConfigurationClass()); - Object frameworkConfig = ClassLoadingUtils.instantiate( - FrameworkManager.getConnectionConfigurationClass()); + Object connectorConfig = ClassUtils.instantiate( + connector.getJobConfigurationClass(job.getType())); + Object frameworkConfig = ClassUtils.instantiate( + FrameworkManager.getJobConfigurationClass(job.getType())); FormUtils.fillValues(job.getConnectorPart().getForms(), connectorConfig); FormUtils.fillValues(job.getFrameworkPart().getForms(), frameworkConfig); http://git-wip-us.apache.org/repos/asf/sqoop/blob/1cd3c373/server/src/main/java/org/apache/sqoop/handler/SubmissionRequestHandler.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/sqoop/handler/SubmissionRequestHandler.java b/server/src/main/java/org/apache/sqoop/handler/SubmissionRequestHandler.java new file mode 100644 index 0000000..e9e6551 --- /dev/null +++ b/server/src/main/java/org/apache/sqoop/handler/SubmissionRequestHandler.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.handler; + +import org.apache.log4j.Logger; +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.framework.FrameworkManager; +import org.apache.sqoop.json.JsonBean; +import org.apache.sqoop.json.SubmissionBean; +import org.apache.sqoop.model.MSubmission; +import org.apache.sqoop.server.RequestContext; +import org.apache.sqoop.server.RequestHandler; +import org.apache.sqoop.server.common.ServerError; + +/** + * Submission request handler is supporting following resources: + * + * GET /v1/submission/action/:jid + * Get status of last submission for job with id :jid + * + * POST /v1/submission/action/:jid + * Create new submission for job with id :jid + * + * DELETE /v1/submission/action/:jid + * Stop last submission for job with id :jid + * + * Possible additions in the future: /v1/submission/history/* for history. + */ +public class SubmissionRequestHandler implements RequestHandler { + + private final Logger logger = Logger.getLogger(getClass()); + + public SubmissionRequestHandler() { + logger.info("SubmissionRequestHandler initialized"); + } + + @Override + public JsonBean handleEvent(RequestContext ctx) { + String[] urlElements = ctx.getUrlElements(); + if (urlElements.length < 2) { + throw new SqoopException(ServerError.SERVER_0003, + "Invalid URL, too few arguments for this servlet."); + } + + // Let's check + int length = urlElements.length; + String action = urlElements[length - 2]; + + if(action.equals("action")) { + return handleActionEvent(ctx, urlElements[length - 1]); + } + + throw new SqoopException(ServerError.SERVER_0003, + "Do not know what to do."); + } + + private JsonBean handleActionEvent(RequestContext ctx, String sjid) { + long jid = Long.parseLong(sjid); + + switch (ctx.getMethod()) { + case GET: + return submissionStatus(jid); + case POST: + return submissionSubmit(jid); + case DELETE: + return submissionStop(jid); + } + + return null; + } + + private JsonBean submissionStop(long jid) { + MSubmission submission = FrameworkManager.stop(jid); + return new SubmissionBean(submission); + } + + private JsonBean submissionSubmit(long jid) { + MSubmission submission = FrameworkManager.submit(jid); + return new SubmissionBean(submission); + } + + private JsonBean submissionStatus(long jid) { + MSubmission submission = FrameworkManager.status(jid); + return new SubmissionBean(submission); + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/1cd3c373/server/src/main/java/org/apache/sqoop/server/RequestContext.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/sqoop/server/RequestContext.java b/server/src/main/java/org/apache/sqoop/server/RequestContext.java index 78950f6..c6b6569 100644 --- a/server/src/main/java/org/apache/sqoop/server/RequestContext.java +++ b/server/src/main/java/org/apache/sqoop/server/RequestContext.java @@ -85,6 +85,13 @@ public class RequestContext { } /** + * Return all elements in the url as an array + */ + public String[] getUrlElements() { + return getRequest().getRequestURI().split("/"); + } + + /** * Get locale specified in accept-language HTTP header. * * @return First specified locale http://git-wip-us.apache.org/repos/asf/sqoop/blob/1cd3c373/server/src/main/java/org/apache/sqoop/server/ServerInitializer.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/sqoop/server/ServerInitializer.java b/server/src/main/java/org/apache/sqoop/server/ServerInitializer.java index 993c153..ae0735b 100644 --- a/server/src/main/java/org/apache/sqoop/server/ServerInitializer.java +++ b/server/src/main/java/org/apache/sqoop/server/ServerInitializer.java @@ -37,18 +37,22 @@ public class ServerInitializer implements ServletContextListener { Logger.getLogger(ServerInitializer.class); public void contextDestroyed(ServletContextEvent arg0) { + LOG.info("Shutting down Sqoop server"); FrameworkManager.destroy(); ConnectorManager.destroy(); RepositoryManager.destroy(); SqoopConfiguration.destroy(); + LOG.info("Sqoop server has been correctly terminated"); } public void contextInitialized(ServletContextEvent arg0) { try { + LOG.info("Booting up Sqoop server"); SqoopConfiguration.initialize(); RepositoryManager.initialize(); ConnectorManager.initialize(); FrameworkManager.initialize(); + LOG.info("Sqoop server has successfully boot up"); } catch (RuntimeException ex) { LOG.error("Server startup failure", ex); throw ex; http://git-wip-us.apache.org/repos/asf/sqoop/blob/1cd3c373/server/src/main/java/org/apache/sqoop/server/v1/SubmissionServlet.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/sqoop/server/v1/SubmissionServlet.java b/server/src/main/java/org/apache/sqoop/server/v1/SubmissionServlet.java new file mode 100644 index 0000000..7252e11 --- /dev/null +++ b/server/src/main/java/org/apache/sqoop/server/v1/SubmissionServlet.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.server.v1; + +import org.apache.sqoop.handler.SubmissionRequestHandler; +import org.apache.sqoop.json.JsonBean; +import org.apache.sqoop.server.RequestContext; +import org.apache.sqoop.server.RequestHandler; +import org.apache.sqoop.server.SqoopProtocolServlet; + +/** + * + */ +public class SubmissionServlet extends SqoopProtocolServlet { + + private RequestHandler requestHandler; + + public SubmissionServlet() { + requestHandler = new SubmissionRequestHandler(); + } + + @Override + protected JsonBean handleGetRequest(RequestContext ctx) throws Exception { + return requestHandler.handleEvent(ctx); + } + + @Override + protected JsonBean handlePostRequest(RequestContext ctx) throws Exception { + return requestHandler.handleEvent(ctx); + } + + @Override + protected JsonBean handleDeleteRequest(RequestContext ctx) throws Exception { + return requestHandler.handleEvent(ctx); + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/1cd3c373/server/src/main/webapp/WEB-INF/web.xml ---------------------------------------------------------------------- diff --git a/server/src/main/webapp/WEB-INF/web.xml b/server/src/main/webapp/WEB-INF/web.xml index 69229bf..f053062 100644 --- a/server/src/main/webapp/WEB-INF/web.xml +++ b/server/src/main/webapp/WEB-INF/web.xml @@ -87,5 +87,18 @@ limitations under the License. <url-pattern>/v1/job/*</url-pattern> </servlet-mapping> + <!-- Submission servlet --> + <servlet> + <servlet-name>v1.SubmissionServlet</servlet-name> + <servlet-class>org.apache.sqoop.server.v1.SubmissionServlet</servlet-class> + <load-on-startup>1</load-on-startup> + </servlet> + + <servlet-mapping> + <servlet-name>v1.SubmissionServlet</servlet-name> + <url-pattern>/v1/submission/*</url-pattern> + </servlet-mapping> + + </web-app> http://git-wip-us.apache.org/repos/asf/sqoop/blob/1cd3c373/spi/src/main/java/org/apache/sqoop/job/etl/CallbackBase.java ---------------------------------------------------------------------- diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/CallbackBase.java b/spi/src/main/java/org/apache/sqoop/job/etl/CallbackBase.java new file mode 100644 index 0000000..59a9457 --- /dev/null +++ b/spi/src/main/java/org/apache/sqoop/job/etl/CallbackBase.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.job.etl; + +/** + * Set of default callbacks that must be implement by each job type. + */ +public abstract class CallbackBase { + + private Class<? extends Initializer> initializer; + private Class<? extends Destroyer> destroyer; + + public CallbackBase( + Class<? extends Initializer> initializer, + Class<? extends Destroyer> destroyer + ) { + this.initializer = initializer; + this.destroyer = destroyer; + } + + public Class<? extends Destroyer> getDestroyer() { + return destroyer; + } + + public Class<? extends Initializer> getInitializer() { + return initializer; + } + + @Override + public String toString() { + return "initializer=" + initializer.getName() + + ", destroyer=" + destroyer.getName(); + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/1cd3c373/spi/src/main/java/org/apache/sqoop/job/etl/Context.java ---------------------------------------------------------------------- diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Context.java b/spi/src/main/java/org/apache/sqoop/job/etl/Context.java deleted file mode 100644 index fc01c96..0000000 --- a/spi/src/main/java/org/apache/sqoop/job/etl/Context.java +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.job.etl; - -/** - * The context for getting configuration values. - */ -public interface Context { - - String getString(String key); - -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/1cd3c373/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java ---------------------------------------------------------------------- diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java b/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java index af766f3..37b9f1b 100644 --- a/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java +++ b/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java @@ -17,12 +17,15 @@ */ package org.apache.sqoop.job.etl; +import org.apache.sqoop.common.MapContext; + /** * This allows connector to define work to complete execution, for example, * resource cleaning. */ public abstract class Destroyer { - public abstract void run(Context context); + // TODO(Jarcec): This should be called with ImmutableContext + public abstract void run(MapContext context); } http://git-wip-us.apache.org/repos/asf/sqoop/blob/1cd3c373/spi/src/main/java/org/apache/sqoop/job/etl/Exporter.java ---------------------------------------------------------------------- diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Exporter.java b/spi/src/main/java/org/apache/sqoop/job/etl/Exporter.java index ef690bf..cdaa623 100644 --- a/spi/src/main/java/org/apache/sqoop/job/etl/Exporter.java +++ b/spi/src/main/java/org/apache/sqoop/job/etl/Exporter.java @@ -25,32 +25,27 @@ package org.apache.sqoop.job.etl; * -> Loader * -> Destroyer */ -public class Exporter { +public class Exporter extends CallbackBase { - private Class<? extends Initializer> initializer; private Class<? extends Loader> loader; - private Class<? extends Destroyer> destroyer; public Exporter( Class<? extends Initializer> initializer, Class<? extends Loader> loader, Class<? extends Destroyer> destroyer ) { - this.initializer = initializer; + super(initializer, destroyer); this.loader = loader; - this.destroyer = destroyer; - } - - public Class<? extends Initializer> getInitializer() { - return initializer; } public Class<? extends Loader> getLoader() { return loader; } - public Class<? extends Destroyer> getDestroyer() { - return destroyer; + @Override + public String toString() { + return "Exporter{" + super.toString() + + ", loader=" + loader + + '}'; } - } http://git-wip-us.apache.org/repos/asf/sqoop/blob/1cd3c373/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java ---------------------------------------------------------------------- diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java b/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java index 20bdeda..ba04be9 100644 --- a/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java +++ b/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java @@ -17,6 +17,7 @@ */ package org.apache.sqoop.job.etl; +import org.apache.sqoop.common.ImmutableContext; import org.apache.sqoop.job.io.DataWriter; /** @@ -25,7 +26,10 @@ import org.apache.sqoop.job.io.DataWriter; */ public abstract class Extractor { - public abstract void run(Context context, - Partition partition, DataWriter writer); + public abstract void run(ImmutableContext context, + Object connectionConfiguration, + Object jobConfiguration, + Partition partition, + DataWriter writer); } http://git-wip-us.apache.org/repos/asf/sqoop/blob/1cd3c373/spi/src/main/java/org/apache/sqoop/job/etl/Importer.java ---------------------------------------------------------------------- diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Importer.java b/spi/src/main/java/org/apache/sqoop/job/etl/Importer.java index f0a8d1a..d4c9e70 100644 --- a/spi/src/main/java/org/apache/sqoop/job/etl/Importer.java +++ b/spi/src/main/java/org/apache/sqoop/job/etl/Importer.java @@ -26,25 +26,18 @@ package org.apache.sqoop.job.etl; * -> (framework-defined steps) * -> Destroyer */ -public class Importer { +public class Importer extends CallbackBase { - private Class<? extends Initializer> initializer; private Class<? extends Partitioner> partitioner; private Class<? extends Extractor> extractor; - private Class<? extends Destroyer> destroyer; public Importer(Class<? extends Initializer> initializer, Class<? extends Partitioner> partitioner, Class<? extends Extractor> extractor, Class<? extends Destroyer> destroyer) { - this.initializer = initializer; + super(initializer, destroyer); this.partitioner = partitioner; this.extractor = extractor; - this.destroyer = destroyer; - } - - public Class<? extends Initializer> getInitializer() { - return initializer; } public Class<? extends Partitioner> getPartitioner() { @@ -55,8 +48,11 @@ public class Importer { return extractor; } - public Class<? extends Destroyer> getDestroyer() { - return destroyer; + @Override + public String toString() { + return "Importer{" + super.toString() + + ", partitioner=" + partitioner.getName() + + ", extractor=" + extractor.getName() + + '}'; } - } http://git-wip-us.apache.org/repos/asf/sqoop/blob/1cd3c373/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java ---------------------------------------------------------------------- diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java b/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java index 75bd42e..2092815 100644 --- a/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java +++ b/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java @@ -17,12 +17,42 @@ */ package org.apache.sqoop.job.etl; +import org.apache.sqoop.common.MapContext; +import org.apache.sqoop.common.MutableMapContext; + +import java.util.LinkedList; +import java.util.List; + /** * This allows connector to define initialization work for execution, * for example, context configuration. */ public abstract class Initializer { - public abstract void run(MutableContext context, Options options); + /** + * Initialize new submission based on given configuration properties. Any + * needed temporary values might be saved to context object and they will be + * promoted to all other part of the workflow automatically. + * + * @param context Changeable context object, purely for connector usage + * @param connectionConfiguration Connector's connection configuration object + * @param jobConfiguration Connector's job configuration object + */ + public abstract void initialize(MutableMapContext context, + Object connectionConfiguration, + Object jobConfiguration); + + /** + * Return list of all jars that this particular connector needs to operate + * on following job. This method will be called after running initialize + * method. + * + * @return + */ + public List<String> getJars(MapContext context, + Object connectionConfiguration, + Object jobConfiguration) { + return new LinkedList<String>(); + } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/1cd3c373/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java ---------------------------------------------------------------------- diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java b/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java index 5474927..3a708df 100644 --- a/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java +++ b/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java @@ -17,6 +17,7 @@ */ package org.apache.sqoop.job.etl; +import org.apache.sqoop.common.ImmutableContext; import org.apache.sqoop.job.io.DataReader; /** @@ -24,6 +25,6 @@ import org.apache.sqoop.job.io.DataReader; */ public abstract class Loader { - public abstract void run(Context context, DataReader reader); + public abstract void run(ImmutableContext context, DataReader reader); } http://git-wip-us.apache.org/repos/asf/sqoop/blob/1cd3c373/spi/src/main/java/org/apache/sqoop/job/etl/MutableContext.java ---------------------------------------------------------------------- diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/MutableContext.java b/spi/src/main/java/org/apache/sqoop/job/etl/MutableContext.java deleted file mode 100644 index 03678c5..0000000 --- a/spi/src/main/java/org/apache/sqoop/job/etl/MutableContext.java +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.job.etl; - -/** - * The context for getting and setting configuration values. - */ -public interface MutableContext extends Context { - - void setString(String key, String value); - -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/1cd3c373/spi/src/main/java/org/apache/sqoop/job/etl/Options.java ---------------------------------------------------------------------- diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Options.java b/spi/src/main/java/org/apache/sqoop/job/etl/Options.java deleted file mode 100644 index 2dc4671..0000000 --- a/spi/src/main/java/org/apache/sqoop/job/etl/Options.java +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.job.etl; - -/** - * The options provided from user input. - */ -public interface Options { - - public String getOption(String key); - -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/1cd3c373/spi/src/main/java/org/apache/sqoop/job/etl/Partition.java ---------------------------------------------------------------------- diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Partition.java b/spi/src/main/java/org/apache/sqoop/job/etl/Partition.java index 8834c80..db07844 100644 --- a/spi/src/main/java/org/apache/sqoop/job/etl/Partition.java +++ b/spi/src/main/java/org/apache/sqoop/job/etl/Partition.java @@ -36,4 +36,11 @@ public abstract class Partition { */ public abstract void write(DataOutput out) throws IOException; + /** + * Each partition must be easily serializable to human readable form so that + * it can be logged for debugging purpose. + * + * @return Human readable representation + */ + public abstract String toString(); } http://git-wip-us.apache.org/repos/asf/sqoop/blob/1cd3c373/spi/src/main/java/org/apache/sqoop/job/etl/Partitioner.java ---------------------------------------------------------------------- diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Partitioner.java b/spi/src/main/java/org/apache/sqoop/job/etl/Partitioner.java index 21310be..3a525c4 100644 --- a/spi/src/main/java/org/apache/sqoop/job/etl/Partitioner.java +++ b/spi/src/main/java/org/apache/sqoop/job/etl/Partitioner.java @@ -17,6 +17,8 @@ */ package org.apache.sqoop.job.etl; +import org.apache.sqoop.common.ImmutableContext; + import java.util.List; /** @@ -25,6 +27,8 @@ import java.util.List; */ public abstract class Partitioner { - public abstract List<Partition> run(Context context); + public abstract List<Partition> getPartitions(ImmutableContext context, + Object connectionConfiguration, + Object jobConfiguration); } http://git-wip-us.apache.org/repos/asf/sqoop/blob/1cd3c373/submission/mapreduce/pom.xml ---------------------------------------------------------------------- diff --git a/submission/mapreduce/pom.xml b/submission/mapreduce/pom.xml new file mode 100644 index 0000000..03c06c0 --- /dev/null +++ b/submission/mapreduce/pom.xml @@ -0,0 +1,67 @@ +<?xml version="1.0"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.sqoop</groupId> + <artifactId>submission</artifactId> + <version>2.0.0-SNAPSHOT</version> + </parent> + + <groupId>org.apache.sqoop.submission</groupId> + <artifactId>sqoop-submission-mapreduce</artifactId> + <version>2.0.0-SNAPSHOT</version> + <name>Sqoop Mapreduce Submission Engine</name> + + <dependencies> + <dependency> + <groupId>org.apache.sqoop</groupId> + <artifactId>sqoop-core</artifactId> + <version>2.0.0-SNAPSHOT</version> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.sqoop</groupId> + <artifactId>sqoop-core</artifactId> + <version>2.0.0-SNAPSHOT</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-jobclient</artifactId> + <scope>provided</scope> + </dependency> + + </dependencies> + +</project> http://git-wip-us.apache.org/repos/asf/sqoop/blob/1cd3c373/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/Constants.java ---------------------------------------------------------------------- diff --git a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/Constants.java b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/Constants.java new file mode 100644 index 0000000..e562701 --- /dev/null +++ b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/Constants.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.submission.mapreduce; + +/** + * Configuration constants for Mapreduce submission engine + */ +public class Constants { + + public static final String PREFIX_MAPREDUCE = "mapreduce."; + + public static final String CONF_CONFIG_DIR = + PREFIX_MAPREDUCE + "configuration.directory"; + + private Constants() { + // Instantiation is prohibited + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/1cd3c373/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java ---------------------------------------------------------------------- diff --git a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java new file mode 100644 index 0000000..7049924 --- /dev/null +++ b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java @@ -0,0 +1,311 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.submission.mapreduce; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobID; +import org.apache.hadoop.mapred.JobStatus; +import org.apache.hadoop.mapred.RunningJob; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.log4j.Logger; +import org.apache.sqoop.common.MapContext; +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.framework.SubmissionRequest; +import org.apache.sqoop.framework.SubmissionEngine; +import org.apache.sqoop.job.JobConstants; +import org.apache.sqoop.model.FormUtils; +import org.apache.sqoop.submission.counter.Counters; +import org.apache.sqoop.submission.SubmissionStatus; + +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; +import java.net.MalformedURLException; +import java.util.Map; + + +/** + * This is very simple and straightforward implementation of map-reduce based + * submission engine. + */ +public class MapreduceSubmissionEngine extends SubmissionEngine { + + + private static Logger LOG = Logger.getLogger(MapreduceSubmissionEngine.class); + + /** + * Global configuration object that is build from hadoop configuration files + * on engine initialization and cloned during each new submission creation. + */ + private Configuration globalConfiguration; + + /** + * Job client that is configured to talk to one specific Job tracker. + */ + private JobClient jobClient; + + + /** + * {@inheritDoc} + */ + @Override + public void initialize(MapContext context, String prefix) { + LOG.info("Initializing Map-reduce Submission Engine"); + + // Build global configuration, start with empty configuration object + globalConfiguration = new Configuration(); + globalConfiguration.clear(); + + // Load configured hadoop configuration directory + String configDirectory = context.getString(prefix + Constants.CONF_CONFIG_DIR); + + // Git list of files ending with "-site.xml" (configuration files) + File dir = new File(configDirectory); + String [] files = dir.list(new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + return name.endsWith("-site.xml"); + } + }); + + // Add each such file to our global configuration object + for (String file : files) { + LOG.info("Found hadoop configuration file " + file); + try { + globalConfiguration.addResource(new File(configDirectory, file).toURI().toURL()); + } catch (MalformedURLException e) { + LOG.error("Can't load configuration file: " + file, e); + } + } + + // Create job client + try { + jobClient = new JobClient(new Configuration(globalConfiguration)); + } catch (IOException e) { + throw new SqoopException(MapreduceSubmissionError.MAPREDUCE_0002, e); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void destroy() { + LOG.info("Destroying Mapreduce Submission Engine"); + } + + /** + * {@inheritDoc} + */ + @Override + @SuppressWarnings("unchecked") + public boolean submit(SubmissionRequest request) { + // Clone global configuration + Configuration configuration = new Configuration(globalConfiguration); + + // Serialize framework context into job configuration + for(Map.Entry<String, String> entry: request.getFrameworkContext()) { + configuration.set(entry.getKey(), entry.getValue()); + } + + // Serialize connector context as a sub namespace + for(Map.Entry<String, String> entry :request.getConnectorContext()) { + configuration.set( + JobConstants.PREFIX_CONNECTOR_CONTEXT + entry.getKey(), + entry.getValue()); + } + + // Serialize configuration objects - Firstly configuration classes + configuration.set(JobConstants.JOB_CONFIG_CLASS_CONNECTOR_CONNECTION, + request.getConfigConnectorConnection().getClass().getName()); + configuration.set(JobConstants.JOB_CONFIG_CLASS_CONNECTOR_JOB, + request.getConfigConnectorJob().getClass().getName()); + configuration.set(JobConstants.JOB_CONFIG_CLASS_FRAMEWORK_CONNECTION, + request.getConfigFrameworkConnection().getClass().getName()); + configuration.set(JobConstants.JOB_CONFIG_CLASS_FRAMEWORK_JOB, + request.getConfigFrameworkJob().getClass().getName()); + + // And finally configuration data + configuration.set(JobConstants.JOB_CONFIG_CONNECTOR_CONNECTION, + FormUtils.toJson(request.getConfigConnectorConnection())); + configuration.set(JobConstants.JOB_CONFIG_CONNECTOR_JOB, + FormUtils.toJson(request.getConfigConnectorJob())); + configuration.set(JobConstants.JOB_CONFIG_FRAMEWORK_CONNECTION, + FormUtils.toJson(request.getConfigFrameworkConnection())); + configuration.set(JobConstants.JOB_CONFIG_FRAMEWORK_JOB, + FormUtils.toJson(request.getConfigFrameworkConnection())); + + // Promote all required jars to the job + StringBuilder sb = new StringBuilder(); + boolean first = true; + for(String jar : request.getJars()) { + if(first) { + first = false; + } else { + sb.append(","); + } + LOG.debug("Adding jar to the job: " + jar); + sb.append(jar); + } + configuration.set("tmpjars", sb.toString()); + + try { + Job job = Job.getInstance(configuration); + job.setJobName(request.getJobName()); + + job.setInputFormatClass(request.getInputFormatClass()); + + job.setMapperClass(request.getMapperClass()); + job.setMapOutputKeyClass(request.getMapOutputKeyClass()); + job.setMapOutputValueClass(request.getMapOutputValueClass()); + + String outputDirectory = request.getOutputDirectory(); + if(outputDirectory != null) { + FileOutputFormat.setOutputPath(job, new Path(outputDirectory)); + } + + // TODO(jarcec): Harcoded no reducers + job.setNumReduceTasks(0); + + job.setOutputFormatClass(request.getOutputFormatClass()); + job.setOutputKeyClass(request.getOutputKeyClass()); + job.setOutputValueClass(request.getOutputValueClass()); + + job.submit(); + + String jobId = job.getJobID().toString(); + request.getSummary().setExternalId(jobId); + request.getSummary().setExternalLink(job.getTrackingURL()); + + LOG.debug("Executed new map-reduce job with id " + jobId); + } catch (Exception e) { + LOG.error("Error in submitting job", e); + return false; + } + return true; + } + + /** + * {@inheritDoc} + */ + @Override + public void stop(String submissionId) { + try { + RunningJob runningJob = jobClient.getJob(JobID.forName(submissionId)); + if(runningJob == null) { + return; + } + + runningJob.killJob(); + } catch (IOException e) { + throw new SqoopException(MapreduceSubmissionError.MAPREDUCE_0003, e); + } + } + + /** + * {@inheritDoc} + */ + @Override + public SubmissionStatus status(String submissionId) { + try { + RunningJob runningJob = jobClient.getJob(JobID.forName(submissionId)); + if(runningJob == null) { + return SubmissionStatus.UNKNOWN; + } + + int status = runningJob.getJobState(); + return convertMapreduceState(status); + + } catch (IOException e) { + throw new SqoopException(MapreduceSubmissionError.MAPREDUCE_0003, e); + } + } + + /** + * {@inheritDoc} + */ + @Override + public double progress(String submissionId) { + try { + // Get some reasonable approximation of map-reduce job progress + // TODO(jarcec): What if we're running without reducers? + RunningJob runningJob = jobClient.getJob(JobID.forName(submissionId)); + if(runningJob == null) { + // Return default value + return super.progress(submissionId); + } + + return (runningJob.mapProgress() + runningJob.reduceProgress()) / 2; + } catch (IOException e) { + throw new SqoopException(MapreduceSubmissionError.MAPREDUCE_0003, e); + } + } + + /** + * {@inheritDoc} + */ + @Override + public Counters stats(String submissionId) { + //TODO(jarcec): Not supported yet + return super.stats(submissionId); + } + + /** + * {@inheritDoc} + */ + @Override + public String externalLink(String submissionId) { + try { + RunningJob runningJob = jobClient.getJob(JobID.forName(submissionId)); + if(runningJob == null) { + return null; + } + + return runningJob.getTrackingURL(); + } catch (IOException e) { + throw new SqoopException(MapreduceSubmissionError.MAPREDUCE_0003, e); + } + } + + /** + * Convert map-reduce specific job status constants to Sqoop job status + * constants. + * + * @param status Map-reduce job constant + * @return Equivalent submission status + */ + protected SubmissionStatus convertMapreduceState(int status) { + if(status == JobStatus.PREP) { + return SubmissionStatus.BOOTING; + } else if (status == JobStatus.RUNNING) { + return SubmissionStatus.RUNNING; + } else if (status == JobStatus.FAILED) { + return SubmissionStatus.FAILED; + } else if (status == JobStatus.KILLED) { + return SubmissionStatus.FAILED; + } else if (status == JobStatus.SUCCEEDED) { + return SubmissionStatus.SUCCEEDED; + } + + throw new SqoopException(MapreduceSubmissionError.MAPREDUCE_0004, + "Unknown status " + status); + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/1cd3c373/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionError.java ---------------------------------------------------------------------- diff --git a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionError.java b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionError.java new file mode 100644 index 0000000..9296717 --- /dev/null +++ b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionError.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.submission.mapreduce; + +import org.apache.sqoop.common.ErrorCode; + +/** + * + */ +public enum MapreduceSubmissionError implements ErrorCode { + + MAPREDUCE_0001("Unknown error"), + + MAPREDUCE_0002("Failure on submission engine initialization"), + + MAPREDUCE_0003("Can't get RunningJob instance"), + + MAPREDUCE_0004("Unknown map reduce job status"), + + ; + + private final String message; + + private MapreduceSubmissionError(String message) { + this.message = message; + } + + public String getCode() { + return name(); + } + + public String getMessage() { + return message; + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/1cd3c373/submission/pom.xml ---------------------------------------------------------------------- diff --git a/submission/pom.xml b/submission/pom.xml new file mode 100644 index 0000000..16550d9 --- /dev/null +++ b/submission/pom.xml @@ -0,0 +1,36 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache</groupId> + <artifactId>sqoop</artifactId> + <version>2.0.0-SNAPSHOT</version> + </parent> + + <groupId>org.apache.sqoop</groupId> + <artifactId>submission</artifactId> + <name>Sqoop Submission Engines</name> + <packaging>pom</packaging> + + <modules> + <module>mapreduce</module> + </modules> + +</project>
