Repository: falcon Updated Branches: refs/heads/master 6e162747b -> 3c011688b
FALCON-2259 Unregister an extension only if no extension jobs are dependant on the extension Author: sandeep <[email protected]> Reviewers: @pallavi-rao Closes #342 from sandeepSamudrala/FALCON-2259 and squashes the following commits: d317d3d [sandeep] Fixed tests 6617993 [sandeep] Incorporated review comments 46ad617 [sandeep] Incorporated review comments c1189be [sandeep] Incorporated review comments a70f5a9 [sandeep] FALCON-2259 Unregister an extension only if no extension jobs are dependant on the extension 7e16263 [sandeep] Merge branch 'master' of https://github.com/apache/falcon a234d94 [sandeep] FALCON-2231 Incoporated review comments and small fixes for duplicate submission and colo addition to schedule command 26e3350 [sandeep] Merge branch 'master' of https://github.com/apache/falcon 73fbf75 [sandeep] Merge branch 'master' of https://github.com/apache/falcon cc28658 [sandeep] Merge branch 'master' of https://github.com/apache/falcon 089b10d [sandeep] Merge branch 'master' of https://github.com/apache/falcon 456d4ee [sandeep] Merge branch 'master' of https://github.com/apache/falcon 0cf9af6 [sandeep] Merge branch 'master' of https://github.com/apache/falcon 4a2e23e [sandeep] Merge branch 'master' of https://github.com/apache/falcon b1546ed [sandeep] Merge branch 'master' of https://github.com/apache/falcon 0a433fb [sandeep] Merge branch 'master' of https://github.com/apache/falcon 194f36a [sandeep] Merge branch 'master' of https://github.com/apache/falcon e0ad358 [sandeep] Merge branch 'master' of https://github.com/apache/falcon f96a084 [sandeep] Merge branch 'master' of https://github.com/apache/falcon 9cf36e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon bbca081 [sandeep] Merge branch 'master' of https://github.com/apache/falcon 48f6afa [sandeep] Merge branch 'master' of https://github.com/apache/falcon 250cc46 [sandeep] Merge branch 'master' of https://github.com/apache/falcon d0393e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon a178805 [sandeep] Merge branch 'master' of https://github.com/apache/falcon d6dc8bf [sandeep] Merge branch 'master' of https://github.com/apache/falcon 1bb8d3c [sandeep] Merge branch 'master' of https://github.com/apache/falcon c065566 [sandeep] reverting last line changes made 1a4dcd2 [sandeep] rebased and resolved the conflicts from master 271318b [sandeep] FALCON-2097. Adding UT to the new method for getting next instance time with Delay. a94d4fe [sandeep] rebasing from master 9e68a57 [sandeep] FALCON-298. Feed update with replication delay creates holes Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/3c011688 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/3c011688 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/3c011688 Branch: refs/heads/master Commit: 3c011688bf910ee47786afa6f6c582d9c8d356ef Parents: 6e16274 Author: sandeep <[email protected]> Authored: Thu Jan 19 10:53:06 2017 +0530 Committer: Pallavi Rao <[email protected]> Committed: Thu Jan 19 10:53:06 2017 +0530 ---------------------------------------------------------------------- .../falcon/persistence/ExtensionJobsBean.java | 3 +- .../persistence/PersistenceConstants.java | 7 ++-- .../extensions/jdbc/ExtensionMetaStore.java | 12 ++++++ .../falcon/extensions/store/ExtensionStore.java | 12 ++++++ .../resource/AbstractExtensionManager.java | 24 +++++++++--- .../apache/falcon/unit/FalconUnitClient.java | 6 ++- .../falcon/unit/LocalExtensionManager.java | 2 +- .../apache/falcon/unit/FalconUnitTestBase.java | 18 ++++----- .../org/apache/falcon/unit/TestFalconUnit.java | 39 ++++++++++---------- 9 files changed, 84 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/3c011688/common/src/main/java/org/apache/falcon/persistence/ExtensionJobsBean.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/persistence/ExtensionJobsBean.java b/common/src/main/java/org/apache/falcon/persistence/ExtensionJobsBean.java index 15a4dac..b6ac79d 100644 --- a/common/src/main/java/org/apache/falcon/persistence/ExtensionJobsBean.java +++ b/common/src/main/java/org/apache/falcon/persistence/ExtensionJobsBean.java @@ -44,7 +44,8 @@ import java.util.List; @NamedQueries({ @NamedQuery(name = PersistenceConstants.GET_ALL_EXTENSION_JOBS, query = "select OBJECT(a) from ExtensionJobsBean a "), @NamedQuery(name = PersistenceConstants.DELETE_EXTENSION_JOB, query = "delete from ExtensionJobsBean a where a.jobName = :jobName "), - @NamedQuery(name = PersistenceConstants.GET_EXTENSION_JOB, query = "select OBJECT(a) from ExtensionJobsBean a where a.jobName = :jobName") + @NamedQuery(name = PersistenceConstants.GET_EXTENSION_JOB, query = "select OBJECT(a) from ExtensionJobsBean a where a.jobName = :jobName"), + @NamedQuery(name = PersistenceConstants.GET_JOBS_FOR_AN_EXTENSION, query = "select OBJECT(a) from ExtensionJobsBean a where a.extensionName = :extensionName") }) //RESUME CHECKSTYLE CHECK LineLengthCheck public class ExtensionJobsBean { http://git-wip-us.apache.org/repos/asf/falcon/blob/3c011688/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java index e80f7b7..1e6a04b 100644 --- a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java +++ b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java @@ -60,9 +60,9 @@ public final class PersistenceConstants { public static final String GET_INSTANCE_SUMMARY_BY_STATE_WITH_RANGE = "GET_INSTANCE_SUMMARY_BY_STATE_WITH_RANGE"; public static final String GET_LATEST_INSTANCE_TIME = "GET_LATEST_INSTANCE_TIME"; - public static final String GET_ENTITY_ALERTS = "GET_ENTITY_ALERTS"; - public static final String GET_ALL_ENTITY_ALERTS = "GET_ALL_ENTITY_ALERTS"; - public static final String GET_SLA_HIGH_CANDIDATES = "GET_SLA_HIGH_CANDIDATES"; + static final String GET_ENTITY_ALERTS = "GET_ENTITY_ALERTS"; + static final String GET_ALL_ENTITY_ALERTS = "GET_ALL_ENTITY_ALERTS"; + static final String GET_SLA_HIGH_CANDIDATES = "GET_SLA_HIGH_CANDIDATES"; public static final String UPDATE_SLA_HIGH = "UPDATE_SLA_HIGH"; public static final String GET_ENTITY_ALERT_INSTANCE = "GET_ENTITY_ALERT_INSTANCE"; @@ -80,5 +80,6 @@ public final class PersistenceConstants { public static final String GET_ALL_EXTENSION_JOBS = "GET_ALL_EXTENSION_JOBS"; public static final String DELETE_EXTENSION_JOB = "DELETE_EXTENSION_JOB"; public static final String GET_EXTENSION_JOB = "GET_EXTENSION_JOB"; + public static final String GET_JOBS_FOR_AN_EXTENSION = "GET_JOBS_FOR_AN_EXTENSION"; public static final String GET_ALL_PROCESS_INFO_INSTANCES = "GET_ALL_PROCESS_INFO_INSTANCES"; } http://git-wip-us.apache.org/repos/asf/falcon/blob/3c011688/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java ---------------------------------------------------------------------- diff --git a/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java b/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java index df5d6c9..b47766c 100644 --- a/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java +++ b/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java @@ -137,6 +137,18 @@ public class ExtensionMetaStore { } } + public List<ExtensionJobsBean> getJobsForAnExtension(String extensionName) { + EntityManager entityManager = getEntityManager(); + beginTransaction(entityManager); + Query query = entityManager.createNamedQuery(PersistenceConstants.GET_JOBS_FOR_AN_EXTENSION); + query.setParameter(EXTENSION_NAME, extensionName); + try { + return (List<ExtensionJobsBean>)query.getResultList(); + } finally { + commitAndCloseTransaction(entityManager); + } + } + public void deleteExtension(String extensionName){ EntityManager entityManager = getEntityManager(); beginTransaction(entityManager); http://git-wip-us.apache.org/repos/asf/falcon/blob/3c011688/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java ---------------------------------------------------------------------- diff --git a/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java b/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java index c50d6de..c3b4feb 100644 --- a/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java +++ b/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java @@ -28,6 +28,7 @@ import org.apache.falcon.extensions.ExtensionType; import org.apache.falcon.extensions.jdbc.ExtensionMetaStore; import org.apache.falcon.hadoop.HadoopClientFactory; import org.apache.falcon.persistence.ExtensionBean; +import org.apache.falcon.persistence.ExtensionJobsBean; import org.apache.falcon.util.StartupProperties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -382,6 +383,17 @@ public final class ExtensionStore { return (storePath != null); } + public List<String> getJobsForAnExtension(final String extensionName) throws FalconException { + List<ExtensionJobsBean> extensionJobs = metaStore.getJobsForAnExtension(extensionName); + List<String> extensionJobNames = new ArrayList<>(); + if (null != extensionJobs && !extensionJobs.isEmpty()) { + for (ExtensionJobsBean extensionJobsBean : extensionJobs) { + extensionJobNames.add(extensionJobsBean.getJobName()); + } + } + return extensionJobNames; + } + public String updateExtensionStatus(final String extensionName, String currentUser, ExtensionStatus status) throws FalconException { validateStatusChange(extensionName, currentUser); http://git-wip-us.apache.org/repos/asf/falcon/blob/3c011688/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java index 63974f2..8ada576 100644 --- a/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java @@ -17,6 +17,7 @@ */ package org.apache.falcon.resource; +import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.StringUtils; import org.apache.falcon.FalconException; import org.apache.falcon.FalconWebException; @@ -99,13 +100,26 @@ public class AbstractExtensionManager extends AbstractSchedulableEntityManager { } } - public APIResult deleteExtensionMetadata(String extensionName){ + public APIResult deleteExtensionMetadata(String extensionName) { validateExtensionName(extensionName); + ExtensionStore metaStore = ExtensionStore.get(); try { - return new APIResult(APIResult.Status.SUCCEEDED, ExtensionStore.get().deleteExtension(extensionName, - CurrentUser.getUser())); - } catch (Throwable e) { - throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR); + canDeleteExtension(extensionName); + return new APIResult(APIResult.Status.SUCCEEDED, + metaStore.deleteExtension(extensionName, CurrentUser.getUser())); + } catch (FalconException e) { + throw FalconWebException.newAPIException(e); + } + } + + private void canDeleteExtension(String extensionName) throws FalconException { + ExtensionStore metaStore = ExtensionStore.get(); + List<String> extensionJobs = metaStore.getJobsForAnExtension(extensionName); + if (!extensionJobs.isEmpty()) { + LOG.error("Extension:{} cannot be unregistered as {} are instances of the extension", extensionName, + ArrayUtils.toString(extensionJobs)); + throw new FalconException("Extension:" + extensionName + " cannot be unregistered as following instances" + + " are dependent on the extension" + ArrayUtils.toString(extensionJobs)); } } http://git-wip-us.apache.org/repos/asf/falcon/blob/3c011688/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java ---------------------------------------------------------------------- diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java index d76dbca..9358958 100644 --- a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java +++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java @@ -277,7 +277,11 @@ public class FalconUnitClient extends AbstractFalconClient { @Override public APIResult unregisterExtension(String extensionName) { - return localExtensionManager.unRegisterExtension(extensionName); + try { + return localExtensionManager.unRegisterExtension(extensionName); + } catch (FalconException e) { + throw new FalconCLIException("Failed in unRegistering the extension"+ e.getMessage()); + } } @Override http://git-wip-us.apache.org/repos/asf/falcon/blob/3c011688/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java ---------------------------------------------------------------------- diff --git a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java index a7303b1..ca39ddb 100644 --- a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java +++ b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java @@ -179,7 +179,7 @@ public class LocalExtensionManager extends AbstractExtensionManager { return super.registerExtensionMetadata(extensionName, packagePath, description, CurrentUser.getUser()); } - APIResult unRegisterExtension(String extensionName) { + APIResult unRegisterExtension(String extensionName) throws FalconException { return super.deleteExtensionMetadata(extensionName); } http://git-wip-us.apache.org/repos/asf/falcon/blob/3c011688/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java ---------------------------------------------------------------------- diff --git a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java index 0dd09c1..e9367d5 100644 --- a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java +++ b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java @@ -218,29 +218,29 @@ public class FalconUnitTestBase { return props; } - public String registerExtension(String extensionName, String packagePath, String description) + APIResult registerExtension(String extensionName, String packagePath, String description) throws IOException, FalconException { - return falconUnitClient.registerExtension(extensionName, packagePath, description).getMessage(); + return falconUnitClient.registerExtension(extensionName, packagePath, description); } - public String disableExtension(String extensionName) { + String disableExtension(String extensionName) { return falconUnitClient.disableExtension(extensionName).getMessage(); } - public String enableExtension(String extensionName) { + String enableExtension(String extensionName) { return falconUnitClient.enableExtension(extensionName).getMessage(); } - public String getExtensionJobDetails(String jobName) { - return falconUnitClient.getExtensionJobDetails(jobName).getMessage(); + APIResult getExtensionJobDetails(String jobName) { + return falconUnitClient.getExtensionJobDetails(jobName); } - public String unregisterExtension(String extensionName) { - return falconUnitClient.unregisterExtension(extensionName).getMessage(); + APIResult unregisterExtension(String extensionName) { + return falconUnitClient.unregisterExtension(extensionName); } - public APIResult submitExtensionJob(String extensionName, String jobName, String configPath, String doAsUser) { + APIResult submitExtensionJob(String extensionName, String jobName, String configPath, String doAsUser) { return falconUnitClient.submitExtensionJob(extensionName, jobName, configPath, doAsUser); } http://git-wip-us.apache.org/repos/asf/falcon/blob/3c011688/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java ---------------------------------------------------------------------- diff --git a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java index 29cfed4..72280f7 100644 --- a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java +++ b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java @@ -245,7 +245,7 @@ public class TestFalconUnit extends FalconUnitTestBase { assertStatus(result); } - public void setDummyProperty(Process process) { + private void setDummyProperty(Process process) { Property property = new Property(); property.setName("dummy"); property.setValue("dummy"); @@ -424,13 +424,12 @@ public class TestFalconUnit extends FalconUnitTestBase { clearDB(); submitCluster(); createExtensionPackage(); - - String result = registerExtension("testExtension", new Path(STORAGE_URL + EXTENSION_PATH).toString() + APIResult apiResult = registerExtension("testExtension", new Path(STORAGE_URL + EXTENSION_PATH).toString() , "testExtension"); - Assert.assertEquals(result, "Extension :testExtension registered successfully."); - - result = unregisterExtension("testExtension"); - Assert.assertEquals(result, "Deleted extension:testExtension"); + assertStatus(apiResult); + apiResult = unregisterExtension("testExtension"); + assertStatus(apiResult); + Assert.assertEquals(apiResult.getMessage(), "Deleted extension:testExtension"); } @Test @@ -441,8 +440,8 @@ public class TestFalconUnit extends FalconUnitTestBase { createDir(PROCESS_APP_PATH); fs.copyFromLocalFile(new Path(getAbsolutePath(WORKFLOW)), new Path(PROCESS_APP_PATH, "workflow.xml")); String packageBuildLib = new Path(EXTENSION_PATH, "libs/build/").toString(); - String result = registerExtension(TEST_EXTENSION, STORAGE_URL + EXTENSION_PATH, TEST_EXTENSION); - Assert.assertEquals(result, "Extension :testExtension registered successfully."); + APIResult apiResult = registerExtension(TEST_EXTENSION, STORAGE_URL + EXTENSION_PATH, TEST_EXTENSION); + assertStatus(apiResult); disableExtension(TEST_EXTENSION); createDir(PROCESS_APP_PATH); @@ -457,10 +456,10 @@ public class TestFalconUnit extends FalconUnitTestBase { } enableExtension(TEST_EXTENSION); - APIResult apiResult = submitExtensionJob(TEST_EXTENSION, TEST_JOB, null, null); + apiResult = submitExtensionJob(TEST_EXTENSION, TEST_JOB, null, null); assertStatus(apiResult); - result = getExtensionJobDetails(TEST_JOB); - JSONObject resultJson = new JSONObject(result); + apiResult = getExtensionJobDetails(TEST_JOB); + JSONObject resultJson = new JSONObject(apiResult.getMessage()); Assert.assertEquals(resultJson.get("extensionName"), TEST_EXTENSION); Process process = (Process) getClient().getDefinition(EntityType.PROCESS.toString(), "sample", null); Assert.assertEquals(process.getPipelines(), "testPipeline"); @@ -482,7 +481,7 @@ public class TestFalconUnit extends FalconUnitTestBase { apiResult = updateExtensionJob(TEST_JOB, getAbsolutePath(EXTENSION_PROPERTIES), null); assertStatus(apiResult); - String processes = new JSONObject(getExtensionJobDetails(TEST_JOB)).get("processes").toString(); + String processes = new JSONObject(getExtensionJobDetails(TEST_JOB).getMessage()).get("processes").toString(); Assert.assertEquals(processes, "sample"); process = (Process) getClient().getDefinition(EntityType.PROCESS.toString(), "sample", null); Assert.assertEquals(process.getPipelines(), "testSample"); @@ -491,6 +490,12 @@ public class TestFalconUnit extends FalconUnitTestBase { assertStatus(apiResult); Assert.assertEquals(apiResult.getMessage(), "RUNNING"); + try { + unregisterExtension(TEST_EXTENSION); + Assert.fail("Should have thrown a FalconCLIException"); + } catch (FalconWebException e) { + //Do nothing. Exception expected as there are dependent extension jobs and so extension cannot be deleted. + } apiResult = deleteExtensionJob(TEST_JOB, null); assertStatus(apiResult); try { @@ -506,14 +511,10 @@ public class TestFalconUnit extends FalconUnitTestBase { Assert.assertEquals(((APIResult) e.getResponse().getEntity()).getMessage(), "Job name not found:testJob"); //Do nothing. Exception Expected. } + apiResult = unregisterExtension(TEST_EXTENSION); + assertStatus(apiResult); } - @Test - public void testExtensionJobSuspendAndResume() throws Exception { - - } - - private void copyExtensionJar(String destDirPath) throws IOException { File dir = new File(new Path(JARS_DIR).toUri().toURL().getPath()); for (File file : dir.listFiles()) {
