Repository: falcon Updated Branches: refs/heads/master 2f1aa291f -> c36a1397c
FALCON-2195 Extension Job Details changes Author: sandeep <[email protected]> Reviewers: @pallavi-rao Closes #315 from sandeepSamudrala/FALCON-2195 and squashes the following commits: 350957f [sandeep] FALCON-2195 Rebased my patch to resolve merge conflicts 5f7e209 [sandeep] FALCON-2195 Added getExtensionJobDetails to falcon unit and updated test falcon unit to test the same 8f209ea [sandeep] FALCON-2195 Extension Job Details changes dc9090e [sandeep] Merge branch 'master' of https://github.com/apache/falcon into FALCON-2195 f96a084 [sandeep] Merge branch 'master' of https://github.com/apache/falcon 9cf36e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon bbca081 [sandeep] Merge branch 'master' of https://github.com/apache/falcon 48f6afa [sandeep] Merge branch 'master' of https://github.com/apache/falcon 250cc46 [sandeep] Merge branch 'master' of https://github.com/apache/falcon d0393e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon a178805 [sandeep] Merge branch 'master' of https://github.com/apache/falcon d6dc8bf [sandeep] Merge branch 'master' of https://github.com/apache/falcon 1bb8d3c [sandeep] Merge branch 'master' of https://github.com/apache/falcon c065566 [sandeep] reverting last line changes made 1a4dcd2 [sandeep] rebased and resolved the conflicts from master 271318b [sandeep] FALCON-2097. Adding UT to the new method for getting next instance time with Delay. a94d4fe [sandeep] rebasing from master 9e68a57 [sandeep] FALCON-298. Feed update with replication delay creates holes Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/c36a1397 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/c36a1397 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/c36a1397 Branch: refs/heads/master Commit: c36a1397cee55f4ba501c45f9580fc42472e441e Parents: 2f1aa29 Author: sandeep <[email protected]> Authored: Tue Dec 13 10:40:00 2016 +0530 Committer: Pallavi Rao <[email protected]> Committed: Tue Dec 13 10:40:00 2016 +0530 ---------------------------------------------------------------------- .../apache/falcon/cli/FalconExtensionCLI.java | 12 +++- .../falcon/client/AbstractFalconClient.java | 7 +++ .../org/apache/falcon/client/FalconClient.java | 7 +++ .../extensions/jdbc/ExtensionMetaStore.java | 18 ++++++ .../extensions/jdbc/ExtensionMetaStoreTest.java | 1 + .../resource/extensions/ExtensionManager.java | 63 ++++++++++++++++---- .../apache/falcon/unit/FalconUnitClient.java | 7 ++- .../falcon/unit/LocalExtensionManager.java | 4 ++ .../apache/falcon/unit/FalconUnitTestBase.java | 4 ++ .../org/apache/falcon/unit/TestFalconUnit.java | 4 ++ 10 files changed, 113 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/c36a1397/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java ---------------------------------------------------------------------- diff --git a/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java b/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java index 57871c3..c8c66bf 100644 --- a/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java +++ b/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java @@ -90,9 +90,15 @@ public class FalconExtensionCLI { validateRequiredParameter(extensionName, EXTENSION_NAME_OPT); result = client.unregisterExtension(extensionName); } else if (optionsList.contains(DETAIL_OPT)) { - validateRequiredParameter(extensionName, EXTENSION_NAME_OPT); - result = client.getExtensionDetail(extensionName); - result = prettyPrintJson(result); + if (optionsList.contains(JOB_NAME_OPT)) { + validateRequiredParameter(jobName, JOB_NAME_OPT); + result = client.getExtensionJobDetails(jobName); + result = prettyPrintJson(result); + } else { + validateRequiredParameter(extensionName, EXTENSION_NAME_OPT); + result = client.getExtensionDetail(extensionName); + result = prettyPrintJson(result); + } } else if (optionsList.contains(FalconCLIConstants.SUBMIT_OPT)) { validateRequiredParameter(extensionName, EXTENSION_NAME_OPT); validateRequiredParameter(jobName, JOB_NAME_OPT); http://git-wip-us.apache.org/repos/asf/falcon/blob/c36a1397/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java index 9659cfc..fc6bc14 100644 --- a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java +++ b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java @@ -222,6 +222,13 @@ public abstract class AbstractFalconClient { String doAsUser); /** + * Prepares set of entities the extension has implemented to validate the extension job. + * @param jobName job name of the extension job. + * @return + */ + public abstract String getExtensionJobDetails(final String jobName); + + /** * * Get list of the entities. * We have two filtering parameters for entity tags: "tags" and "tagkeys". http://git-wip-us.apache.org/repos/asf/falcon/blob/c36a1397/client/src/main/java/org/apache/falcon/client/FalconClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java index 7fa5330..0ccbe48 100644 --- a/client/src/main/java/org/apache/falcon/client/FalconClient.java +++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java @@ -357,6 +357,7 @@ public class FalconClient extends AbstractFalconClient { DELETE("api/extension/delete", HttpMethod.POST, MediaType.TEXT_XML), UNREGISTER("api/extension/unregister/", HttpMethod.POST, MediaType.TEXT_PLAIN), DETAIL("api/extension/detail/", HttpMethod.GET, MediaType.APPLICATION_JSON), + JOB_DETAILS("api/extension/extensionJobDetails/", HttpMethod.GET, MediaType.APPLICATION_JSON), REGISTER("api/extension/register/", HttpMethod.POST, MediaType.TEXT_PLAIN); private String path; @@ -1029,6 +1030,12 @@ public class FalconClient extends AbstractFalconClient { return getResponse(String.class, getExtensionDetailResponse(extensionName)); } + public String getExtensionJobDetails(final String jobName) { + ClientResponse clientResponse = new ResourceBuilder().path(ExtensionOperations.JOB_DETAILS.path, jobName) + .call(ExtensionOperations.JOB_DETAILS); + return getResponse(String.class, clientResponse); + } + public ClientResponse getExtensionDetailResponse(final String extensionName) { return new ResourceBuilder().path(ExtensionOperations.DETAIL.path, extensionName) .call(ExtensionOperations.DETAIL); http://git-wip-us.apache.org/repos/asf/falcon/blob/c36a1397/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java ---------------------------------------------------------------------- diff --git a/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java b/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java index 456c97c..882582f 100644 --- a/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java +++ b/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java @@ -154,6 +154,24 @@ public class ExtensionMetaStore { } } + public ExtensionJobsBean getExtensionJobDetails(String jobName) { + EntityManager entityManager = getEntityManager(); + beginTransaction(entityManager); + Query query = entityManager.createNamedQuery(PersistenceConstants.GET_EXTENSION_JOB); + query.setParameter(JOB_NAME, jobName); + List<ExtensionJobsBean> jobsBeanList; + try { + jobsBeanList = query.getResultList(); + } finally { + commitAndCloseTransaction(entityManager); + } + if (jobsBeanList != null && !jobsBeanList.isEmpty()) { + return jobsBeanList.get(0); + } else { + return null; + } + } + public List<ExtensionJobsBean> getAllExtensionJobs() { EntityManager entityManager = getEntityManager(); beginTransaction(entityManager); http://git-wip-us.apache.org/repos/asf/falcon/blob/c36a1397/extensions/src/test/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStoreTest.java ---------------------------------------------------------------------- diff --git a/extensions/src/test/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStoreTest.java b/extensions/src/test/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStoreTest.java index d96fc1f..099b5d1 100644 --- a/extensions/src/test/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStoreTest.java +++ b/extensions/src/test/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStoreTest.java @@ -83,6 +83,7 @@ public class ExtensionMetaStoreTest extends AbstractTestExtensionStore { stateStore.storeExtensionJob("job1", "test2", feeds, processes, config); Assert.assertEquals(stateStore.getAllExtensionJobs().size(), 1); + Assert.assertEquals(stateStore.getExtensionJobDetails("job1").getFeeds().get(0), "testFeed"); stateStore.deleteExtensionJob("job1"); Assert.assertEquals(stateStore.getAllExtensionJobs().size(), 0); } http://git-wip-us.apache.org/repos/asf/falcon/blob/c36a1397/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java b/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java index 47ee737..9a7daa5 100644 --- a/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java @@ -39,6 +39,7 @@ import org.apache.falcon.extensions.ExtensionType; import org.apache.falcon.extensions.jdbc.ExtensionMetaStore; import org.apache.falcon.extensions.store.ExtensionStore; import org.apache.falcon.persistence.ExtensionBean; +import org.apache.falcon.persistence.ExtensionJobsBean; import org.apache.falcon.resource.APIResult; import org.apache.falcon.resource.AbstractSchedulableEntityManager; import org.apache.falcon.resource.EntityList; @@ -90,13 +91,23 @@ public class ExtensionManager extends AbstractSchedulableEntityManager { private static final String DESCENDING_SORT_ORDER = "desc"; private Extension extension = new Extension(); + private static final String EXTENSION_RESULTS = "extensions"; private static final String TOTAL_RESULTS = "totalResults"; private static final String README = "README"; - private static final String EXTENSION_NAME = "name"; + private static final String NAME = "name"; private static final String EXTENSION_TYPE = "type"; private static final String EXTENSION_DESC = "description"; private static final String EXTENSION_LOCATION = "location"; + private static final String JOB_NAME = "jobName"; + + private static final String EXTENSION_NAME = "extensionName"; + private static final String FEEDS = "feeds"; + private static final String PROCESSES = "processes"; + private static final String CONFIG = "config"; + private static final String CREATION_TIME = "creationTime"; + private static final String LAST_UPDATE_TIME = "lastUpdatedTime"; + private static final String EXTENSION_PROPERTY_JSON_SUFFIX = "-properties.json"; //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck @@ -357,7 +368,7 @@ public class ExtensionManager extends AbstractSchedulableEntityManager { private ExtensionType getExtensionType(String extensionName) { - ExtensionMetaStore metaStore = ExtensionStore.get().getMetaStore(); + ExtensionMetaStore metaStore = ExtensionStore.getMetaStore(); ExtensionBean extensionDetails = metaStore.getDetail(extensionName); return extensionDetails.getExtensionType(); } @@ -549,16 +560,27 @@ public class ExtensionManager extends AbstractSchedulableEntityManager { @GET @Path("detail/{extension-name}") @Produces({MediaType.APPLICATION_JSON}) - public Response getDetail(@PathParam("extension-name") String extensionName){ + public Response getDetail(@PathParam("extension-name") String extensionName) { checkIfExtensionServiceIsEnabled(); validateExtensionName(extensionName); try { - return Response.ok(buildDetailResult(extensionName)).build(); + return Response.ok(buildExtensionDetailResult(extensionName)).build(); } catch (Throwable e) { throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR); } } + @GET + @Path("extensionJobDetails/{job-name}") + @Produces({MediaType.APPLICATION_JSON}) + public String getExtensionJobDetail(@PathParam("job-name") String jobName) { + checkIfExtensionServiceIsEnabled(); + try { + return buildExtensionJobDetailResult(jobName).toString(); + } catch (FalconException e) { + throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR); + } + } @POST @Path("unregister/{extension-name}") @@ -582,7 +604,7 @@ public class ExtensionManager extends AbstractSchedulableEntityManager { public String registerExtensionMetadata( @PathParam("extension-name") String extensionName, @QueryParam("path") String path, - @QueryParam("description") String description){ + @QueryParam("description") String description) { checkIfExtensionServiceIsEnabled(); validateExtensionName(extensionName); try { @@ -616,13 +638,13 @@ public class ExtensionManager extends AbstractSchedulableEntityManager { private static JSONArray buildEnumerateResult() throws FalconException { JSONArray results = new JSONArray(); - ExtensionMetaStore metaStore = ExtensionStore.get().getMetaStore(); + ExtensionMetaStore metaStore = ExtensionStore.getMetaStore(); List<ExtensionBean> extensionBeanList = metaStore.getAllExtensions(); for (ExtensionBean extensionBean : extensionBeanList) { JSONObject resultObject = new JSONObject(); try { - resultObject.put(EXTENSION_NAME, extensionBean.getExtensionName().toLowerCase()); + resultObject.put(NAME, extensionBean.getExtensionName().toLowerCase()); resultObject.put(EXTENSION_TYPE, extensionBean.getExtensionType()); resultObject.put(EXTENSION_DESC, extensionBean.getDescription()); resultObject.put(EXTENSION_LOCATION, extensionBean.getLocation()); @@ -649,8 +671,29 @@ public class ExtensionManager extends AbstractSchedulableEntityManager { return entities; } - private JSONObject buildDetailResult(final String extensionName) throws FalconException { - ExtensionMetaStore metaStore = ExtensionStore.get().getMetaStore(); + private JSONObject buildExtensionJobDetailResult(final String jobName) throws FalconException { + ExtensionMetaStore metaStore = ExtensionStore.getMetaStore(); + ExtensionJobsBean jobsBean = metaStore.getExtensionJobDetails(jobName); + if (jobsBean == null) { + throw new ValidationException("Job name not found:" + jobName); + } + JSONObject detailsObject = new JSONObject(); + try { + detailsObject.put(JOB_NAME, jobsBean.getJobName()); + detailsObject.put(EXTENSION_NAME, jobsBean.getExtensionName()); + detailsObject.put(FEEDS, StringUtils.join(jobsBean.getFeeds(), ",")); + detailsObject.put(PROCESSES, StringUtils.join(jobsBean.getProcesses(), ",")); + detailsObject.put(CONFIG, jobsBean.getConfig()); + detailsObject.put(CREATION_TIME, jobsBean.getCreationTime()); + detailsObject.put(LAST_UPDATE_TIME, jobsBean.getLastUpdatedTime()); + } catch (JSONException e) { + LOG.error("Exception while building extension jon details for job {}", jobName, e); + } + return detailsObject; + } + + private JSONObject buildExtensionDetailResult(final String extensionName) throws FalconException { + ExtensionMetaStore metaStore = ExtensionStore.getMetaStore(); if (!metaStore.checkIfExtensionExists(extensionName)){ throw new ValidationException("No extension resources found for " + extensionName); @@ -659,7 +702,7 @@ public class ExtensionManager extends AbstractSchedulableEntityManager { ExtensionBean bean = metaStore.getDetail(extensionName); JSONObject resultObject = new JSONObject(); try { - resultObject.put(EXTENSION_NAME, bean.getExtensionName()); + resultObject.put(NAME, bean.getExtensionName()); resultObject.put(EXTENSION_TYPE, bean.getExtensionType()); resultObject.put(EXTENSION_DESC, bean.getDescription()); resultObject.put(EXTENSION_LOCATION, bean.getLocation()); http://git-wip-us.apache.org/repos/asf/falcon/blob/c36a1397/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java ---------------------------------------------------------------------- diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java index 4da9f73..c9e1d4c 100644 --- a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java +++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java @@ -310,7 +310,7 @@ public class FalconUnitClient extends AbstractFalconClient { } private List<Entity> getEntities(String extensionName, String jobName, InputStream configStream) { - String packagePath = ExtensionStore.get().getMetaStore().getDetail(extensionName).getLocation(); + String packagePath = ExtensionStore.getMetaStore().getDetail(extensionName).getLocation(); List<Entity> entities; try { entities = ExtensionHandler.loadAndPrepare(extensionName, jobName, configStream, @@ -335,6 +335,11 @@ public class FalconUnitClient extends AbstractFalconClient { } @Override + public String getExtensionJobDetails(final String jobName) { + return localExtensionManager.getExtensionJobDetails(jobName); + } + + @Override public EntityList getEntityList(String entityType, String fields, String nameSubsequence, String tagKeywords, String filterBy, String filterTags, String orderBy, String sortOrder, Integer offset, Integer numResults, String doAsUser) { http://git-wip-us.apache.org/repos/asf/falcon/blob/c36a1397/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java ---------------------------------------------------------------------- diff --git a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java index 553e7d6..da486db 100644 --- a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java +++ b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java @@ -58,4 +58,8 @@ public class LocalExtensionManager extends ExtensionManager { return super.deleteExtensionMetadata(extensionName); } + public String getExtensionJobDetails(String jobName){ + return super.getExtensionJobDetail(jobName); + } + } http://git-wip-us.apache.org/repos/asf/falcon/blob/c36a1397/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java ---------------------------------------------------------------------- diff --git a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java index 4ed7161..6f71747 100644 --- a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java +++ b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java @@ -224,6 +224,10 @@ public class FalconUnitTestBase { return falconUnitClient.registerExtension(extensionName, packagePath, description); } + public String getExtensionJobDetails(String jobName) { + return falconUnitClient.getExtensionJobDetails(jobName); + } + public String unregisterExtension(String extensionName) { return falconUnitClient.unregisterExtension(extensionName); } http://git-wip-us.apache.org/repos/asf/falcon/blob/c36a1397/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java ---------------------------------------------------------------------- diff --git a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java index 3555b22..b7a6e39 100644 --- a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java +++ b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java @@ -32,6 +32,7 @@ import org.apache.falcon.resource.InstancesSummaryResult; import org.apache.falcon.service.FalconJPAService; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.codehaus.jettison.json.JSONObject; import org.testng.Assert; import org.testng.annotations.Test; @@ -426,6 +427,9 @@ public class TestFalconUnit extends FalconUnitTestBase { copyExtensionJar(packageBuildLib); APIResult apiResult = submitAndScheduleExtensionJob("testExtension", "testJob", null, null); assertStatus(apiResult); + result = getExtensionJobDetails("testJob"); + JSONObject resultJson = new JSONObject(result); + Assert.assertEquals(resultJson.get("extensionName"), "testExtension"); apiResult = getClient().getStatus(EntityType.PROCESS, "sample", CLUSTER_NAME, null, false); assertStatus(apiResult);
