Updated Branches: refs/heads/sqoop2 1f016abb9 -> 2e196a841
SQOOP-1115. Sqoop2: Integration: Provide helper methods for synchronous job submission (Jarek Jarcec Cecho via Hari Shreedharan) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/2e196a84 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/2e196a84 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/2e196a84 Branch: refs/heads/sqoop2 Commit: 2e196a841964bdeb69e035636a480e9dee6b1af6 Parents: 1f016ab Author: Hari Shreedharan <[email protected]> Authored: Thu Jul 11 18:22:34 2013 -0700 Committer: Hari Shreedharan <[email protected]> Committed: Thu Jul 11 18:23:51 2013 -0700 ---------------------------------------------------------------------- .../sqoop/test/testcases/ConnectorTestCase.java | 42 ++++++++++++++++++++ .../connector/jdbc/generic/TableExportTest.java | 10 +---- .../connector/jdbc/generic/TableImportTest.java | 10 +---- 3 files changed, 44 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/2e196a84/test/src/main/java/org/apache/sqoop/test/testcases/ConnectorTestCase.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/sqoop/test/testcases/ConnectorTestCase.java b/test/src/main/java/org/apache/sqoop/test/testcases/ConnectorTestCase.java index 6aeadd4..bf1c91f 100644 --- a/test/src/main/java/org/apache/sqoop/test/testcases/ConnectorTestCase.java +++ b/test/src/main/java/org/apache/sqoop/test/testcases/ConnectorTestCase.java @@ -18,12 +18,14 @@ package org.apache.sqoop.test.testcases; import org.apache.log4j.Logger; +import org.apache.sqoop.client.SubmissionCallback; import org.apache.sqoop.framework.configuration.OutputFormat; import org.apache.sqoop.framework.configuration.StorageType; import org.apache.sqoop.model.MConnection; import org.apache.sqoop.model.MFormList; import org.apache.sqoop.model.MJob; import org.apache.sqoop.model.MPersistableEntity; +import org.apache.sqoop.model.MSubmission; import org.apache.sqoop.test.asserts.ProviderAsserts; import org.apache.sqoop.test.data.Cities; import org.apache.sqoop.test.db.DatabaseProvider; @@ -47,6 +49,26 @@ abstract public class ConnectorTestCase extends TomcatTestCase { protected static DatabaseProvider provider; + /** + * Default submission callbacks that are printing various status about the submission. + */ + protected static SubmissionCallback DEFAULT_SUBMISSION_CALLBACKS = new SubmissionCallback() { + @Override + public void submitted(MSubmission submission) { + LOG.info("Submission submitted: " + submission); + } + + @Override + public void updated(MSubmission submission) { + LOG.info("Submission updated: " + submission); + } + + @Override + public void finished(MSubmission submission) { + LOG.info("Submission finished: " + submission); + } + }; + @BeforeClass public static void startProvider() throws Exception { provider = DatabaseProviderFactory.getProvider(System.getProperties()); @@ -174,4 +196,24 @@ abstract public class ConnectorTestCase extends TomcatTestCase { assertEquals(Status.FINE, getClient().createJob(job)); assertNotSame(MPersistableEntity.PERSISTANCE_ID_DEFAULT, job.getPersistenceId()); } + + /** + * Run job with given jid. + * + * @param jid Job id + * @throws Exception + */ + protected void runJob(long jid) throws Exception { + getClient().startSubmission(jid, DEFAULT_SUBMISSION_CALLBACKS, 100); + } + + /** + * Run given job. + * + * @param job Job object + * @throws Exception + */ + protected void runJob(MJob job) throws Exception { + runJob(job.getPersistenceId()); + } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/2e196a84/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableExportTest.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableExportTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableExportTest.java index b36cb07..ee8b97b 100644 --- a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableExportTest.java +++ b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableExportTest.java @@ -59,15 +59,7 @@ public class TableExportTest extends ConnectorTestCase { fillInputForm(job); createJob(job); - MSubmission submission = getClient().startSubmission(job.getPersistenceId()); - assertTrue("Unexpected value: " + submission.getStatus(), submission.getStatus().isRunning()); - - // Wait until the job finish - this active waiting will be removed once - // Sqoop client API will get blocking support. - do { - Thread.sleep(5000); - submission = getClient().getSubmissionStatus(job.getPersistenceId()); - } while(submission.getStatus().isRunning()); + runJob(job); assertEquals(4L, rowCount()); assertRowInCities(1, "USA", "San Francisco"); http://git-wip-us.apache.org/repos/asf/sqoop/blob/2e196a84/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableImportTest.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableImportTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableImportTest.java index 639d9ad..b500828 100644 --- a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableImportTest.java +++ b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableImportTest.java @@ -56,15 +56,7 @@ public class TableImportTest extends ConnectorTestCase { fillOutputForm(job, StorageType.HDFS, OutputFormat.TEXT_FILE); createJob(job); - MSubmission submission = getClient().startSubmission(job.getPersistenceId()); - assertTrue(submission.getStatus().isRunning()); - - // Wait until the job finish - this active waiting will be removed once - // Sqoop client API will get blocking support. - do { - Thread.sleep(5000); - submission = getClient().getSubmissionStatus(job.getPersistenceId()); - } while(submission.getStatus().isRunning()); + runJob(job); // Assert correct output assertMapreduceOutput(
