Repository: falcon Updated Branches: refs/heads/master 4a7d20eeb -> 0657257d8
FALCON-1643 Add CLI option to display captured replication metrics. Contributed by Peeyush Bishnoi. Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/e945d2b4 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/e945d2b4 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/e945d2b4 Branch: refs/heads/master Commit: e945d2b40dfab09ecf14eae2d506e704b103936a Parents: 4a7d20e Author: Ajay Yadava <[email protected]> Authored: Tue Jan 12 23:08:29 2016 +0530 Committer: Ajay Yadava <[email protected]> Committed: Tue Jan 12 23:08:29 2016 +0530 ---------------------------------------------------------------------- CHANGES.txt | 17 ++- .../java/org/apache/falcon/cli/FalconCLI.java | 2 + .../apache/falcon/cli/FalconMetadataCLI.java | 40 ++++++- .../org/apache/falcon/client/FalconClient.java | 39 +++++++ .../falcon/metadata/RelationshipType.java | 3 +- docs/src/site/twiki/FalconCLI.twiki | 13 ++- .../metadata/MetadataDiscoveryResource.java | 113 +++++++++++++++++++ .../metadata/MetadataDiscoveryResourceTest.java | 15 +++ .../resource/metadata/MetadataTestContext.java | 20 ++++ 9 files changed, 254 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/e945d2b4/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index fbd5332..0b1215d 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,15 @@ Apache Falcon Change log +Trunk + TASKS: + INCOMPATIBLE CHANGES + NEW FEATURES + FALCON-1495 In instance status list, show all runs for instances when requested by user(Narayan Periwal via Ajay Yadava) + + FALCON-1230 Data based notification Service to notify execution instances when data becomes available(Pavan Kumar Kolamuri via Ajay Yadava) + IMPROVEMENTS + BUG FIXES + + Proposed Release Version: 0.9 TASKS: FALCON-1718 Change versions in pom.xml of master and 0.9 branch(Pallavi Rao via Ajay Yadava) @@ -12,9 +23,7 @@ Proposed Release Version: 0.9 INCOMPATIBLE CHANGES NEW FEATURES - FALCON-1495 In instance status list, show all runs for instances when requested by user(Narayan Periwal via Ajay Yadava) - - FALCON-1230 Data based notification Service to notify execution instances when data becomes available(Pavan Kumar Kolamuri via Ajay Yadava) + FALCON-1643 Add CLI option to display captured replication metrics(Peeyush Bishnoi via Ajay Yadava) FALCON-1679 API to get type of scheduler(native/oozie) (Pallavi Rao) @@ -33,8 +42,6 @@ Proposed Release Version: 0.9 FALCON-1596 Spring shell based CLI for Falcon FALCON-1608 Base framework for Spring Shell based shell for Falcon (Rajat Khandelwal via Ajay Yadava) - FALCON-1480 Gather data transfer details of Hive DR. (Peeyush Bishnoi via Ajay Yadava) - FALCON-1234 State Store for instances scheduled by Falcon (Pavan Kolamuri via Pallavi Rao) FALCON-1480 Gather data transfer details of Hive DR(Peeyush Bishnoi via Ajay Yadava) http://git-wip-us.apache.org/repos/asf/falcon/blob/e945d2b4/client/src/main/java/org/apache/falcon/cli/FalconCLI.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java index 24f230a..a1f42ce 100644 --- a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java +++ b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java @@ -61,6 +61,8 @@ public class FalconCLI { public static final String TYPE_OPT = "type"; public static final String COLO_OPT = "colo"; public static final String CLUSTER_OPT = "cluster"; + public static final String FEED_OPT = "feed"; + public static final String PROCESS_OPT = "process"; public static final String ENTITY_NAME_OPT = "name"; public static final String FILE_PATH_OPT = "file"; public static final String VERSION_OPT = "version"; http://git-wip-us.apache.org/repos/asf/falcon/blob/e945d2b4/client/src/main/java/org/apache/falcon/cli/FalconMetadataCLI.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/cli/FalconMetadataCLI.java b/client/src/main/java/org/apache/falcon/cli/FalconMetadataCLI.java index 36dd613..6487d41 100644 --- a/client/src/main/java/org/apache/falcon/cli/FalconMetadataCLI.java +++ b/client/src/main/java/org/apache/falcon/cli/FalconMetadataCLI.java @@ -25,6 +25,7 @@ import org.apache.commons.cli.Options; import org.apache.commons.lang3.StringUtils; import org.apache.falcon.client.FalconCLIException; import org.apache.falcon.client.FalconClient; +import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.metadata.RelationshipType; import java.io.PrintStream; @@ -85,6 +86,10 @@ public class FalconMetadataCLI extends FalconCLI { Option type = new Option(TYPE_OPT, true, "Dimension type"); Option name = new Option(NAME_OPT, true, "Dimension name"); Option cluster = new Option(CLUSTER_OPT, true, "Cluster name"); + Option feed = new Option(FEED_OPT, true, "Feed Entity name"); + Option process = new Option(PROCESS_OPT, true, "Process Entity name"); + Option numResults = new Option(NUM_RESULTS_OPT, true, + "Number of results to return per request"); // Add lineage options metadataOptions.addOption(pipeline); @@ -93,6 +98,9 @@ public class FalconMetadataCLI extends FalconCLI { metadataOptions.addOption(type); metadataOptions.addOption(cluster); metadataOptions.addOption(name); + metadataOptions.addOption(feed); + metadataOptions.addOption(process); + metadataOptions.addOption(numResults); Option vertex = new Option(VERTEX_CMD, false, "show the vertices"); Option vertices = new Option(VERTICES_CMD, false, "show the vertices"); @@ -129,6 +137,8 @@ public class FalconMetadataCLI extends FalconCLI { String result; String dimensionType = commandLine.getOptionValue(TYPE_OPT); String cluster = commandLine.getOptionValue(CLUSTER_OPT); + String feed = commandLine.getOptionValue(FEED_OPT); + String process = commandLine.getOptionValue(PROCESS_OPT); String dimensionName = commandLine.getOptionValue(NAME_OPT); String id = commandLine.getOptionValue(ID_OPT); String key = commandLine.getOptionValue(KEY_OPT); @@ -136,13 +146,31 @@ public class FalconMetadataCLI extends FalconCLI { String direction = commandLine.getOptionValue(DIRECTION_OPT); String pipeline = commandLine.getOptionValue(PIPELINE_OPT); String doAsUser = commandLine.getOptionValue(FalconCLI.DO_AS_OPT); + Integer numResults = parseIntegerInput(commandLine.getOptionValue(NUM_RESULTS_OPT), null, "numResults"); if (optionsList.contains(LINEAGE_OPT)) { validatePipelineName(pipeline); result = client.getEntityLineageGraph(pipeline, doAsUser).getDotNotation(); } else if (optionsList.contains(LIST_OPT)) { validateDimensionType(dimensionType.toUpperCase()); - result = client.getDimensionList(dimensionType, cluster, doAsUser); + if (!(dimensionType.toUpperCase()) + .equals(RelationshipType.REPLICATION_METRICS.name())) { + result = client.getDimensionList(dimensionType, cluster, doAsUser); + } else { + String schedEntityType = null; + String schedEntityName = null; + if (StringUtils.isNotEmpty(feed)) { + schedEntityType = EntityType.getEnum(FEED_OPT).name(); + schedEntityName = feed; + } else if (StringUtils.isNotEmpty(process)) { + schedEntityType = EntityType.getEnum(PROCESS_OPT).name(); + schedEntityName = process; + } + validateScheduleEntity(schedEntityType, schedEntityName); + + result = client.getReplicationMetricsDimensionList(schedEntityType, schedEntityName, + numResults, doAsUser); + } } else if (optionsList.contains(RELATIONS_OPT)) { validateDimensionType(dimensionType.toUpperCase()); validateDimensionName(dimensionName, RELATIONS_OPT); @@ -190,6 +218,16 @@ public class FalconMetadataCLI extends FalconCLI { } } + private void validateScheduleEntity(String schedEntityType, String schedEntityName) throws FalconCLIException { + if (StringUtils.isBlank(schedEntityType)) { + throw new FalconCLIException("Entity must be schedulable type : -feed/process"); + } + + if (StringUtils.isBlank(schedEntityName)) { + throw new FalconCLIException("Entity name is missing"); + } + } + private void validateId(String id) throws FalconCLIException { if (id == null || id.length() == 0) { throw new FalconCLIException("Missing argument: id"); http://git-wip-us.apache.org/repos/asf/falcon/blob/e945d2b4/client/src/main/java/org/apache/falcon/client/FalconClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java index 8e2ec40..3f3a871 100644 --- a/client/src/main/java/org/apache/falcon/client/FalconClient.java +++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java @@ -32,6 +32,7 @@ import org.apache.falcon.cli.FalconMetadataCLI; import org.apache.falcon.entity.v0.DateValidator; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.metadata.RelationshipType; import org.apache.falcon.recipe.RecipeTool; import org.apache.falcon.recipe.RecipeToolArgs; import org.apache.falcon.resource.APIResult; @@ -629,6 +630,12 @@ public class FalconClient extends AbstractFalconClient { return sendMetadataDiscoveryRequest(MetadataOperations.LIST, dimensionType, null, cluster, doAsUser); } + public String getReplicationMetricsDimensionList(String schedEntityType, String schedEntityName, + Integer numResults, String doAsUser) throws FalconCLIException { + return sendRequestForReplicationMetrics(MetadataOperations.LIST, + schedEntityType, schedEntityName, numResults, doAsUser); + } + public LineageGraphResult getEntityLineageGraph(String pipelineName, String doAsUser) throws FalconCLIException { MetadataOperations operation = MetadataOperations.LINEAGE; @@ -1079,6 +1086,38 @@ public class FalconClient extends AbstractFalconClient { return clientResponse.getEntity(String.class); } + private String sendRequestForReplicationMetrics(final MetadataOperations operation, final String schedEntityType, + final String schedEntityName, Integer numResults, + final String doAsUser) throws FalconCLIException { + WebResource resource = service.path(operation.path) + .path(schedEntityName) + .path(RelationshipType.REPLICATION_METRICS.getName()) + .path(FalconMetadataCLI.LIST_OPT); + + if (StringUtils.isNotEmpty(schedEntityName)) { + resource = resource.queryParam(FalconCLI.TYPE_OPT, schedEntityType); + } + + if (numResults != null) { + resource = resource.queryParam(FalconCLI.NUM_RESULTS_OPT, numResults.toString()); + } + + if (StringUtils.isNotEmpty(doAsUser)) { + resource = resource.queryParam(FalconCLI.DO_AS_OPT, doAsUser); + } + + ClientResponse clientResponse = resource + .header("Cookie", AUTH_COOKIE_EQ + authenticationToken) + .accept(operation.mimeType).type(operation.mimeType) + .method(operation.method, ClientResponse.class); + + printClientResponse(clientResponse); + + checkIfSuccessful(clientResponse); + return clientResponse.getEntity(String.class); + + } + private String sendMetadataDiscoveryRequest(final MetadataOperations operation, final String dimensionType, final String dimensionName, http://git-wip-us.apache.org/repos/asf/falcon/blob/e945d2b4/client/src/main/java/org/apache/falcon/metadata/RelationshipType.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/metadata/RelationshipType.java b/client/src/main/java/org/apache/falcon/metadata/RelationshipType.java index 8e5f8ea..6624319 100644 --- a/client/src/main/java/org/apache/falcon/metadata/RelationshipType.java +++ b/client/src/main/java/org/apache/falcon/metadata/RelationshipType.java @@ -39,7 +39,8 @@ public enum RelationshipType { COLO("data-center"), TAGS("classification"), GROUPS("group"), - PIPELINES("pipelines"); + PIPELINES("pipelines"), + REPLICATION_METRICS("replication-metrics"); private final String name; http://git-wip-us.apache.org/repos/asf/falcon/blob/e945d2b4/docs/src/site/twiki/FalconCLI.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/FalconCLI.twiki b/docs/src/site/twiki/FalconCLI.twiki index 667d85b..c62d56d 100644 --- a/docs/src/site/twiki/FalconCLI.twiki +++ b/docs/src/site/twiki/FalconCLI.twiki @@ -471,7 +471,7 @@ $FALCON_HOME/bin/falcon metadata -edge -id Q9n-Q-5g Lists of all dimensions of given type. If the user provides optional param cluster, only the dimensions related to the cluster are listed. Usage: -$FALCON_HOME/bin/falcon metadata -list -type [cluster_entity|datasource_entity|feed_entity|process_entity|user|colo|tags|groups|pipelines] +$FALCON_HOME/bin/falcon metadata -list -type [cluster_entity|datasource_entity|feed_entity|process_entity|user|colo|tags|groups|pipelines|replication_metrics] Optional Args : -cluster <<cluster name>> @@ -479,6 +479,17 @@ Example: $FALCON_HOME/bin/falcon metadata -list -type process_entity -cluster primary-cluster $FALCON_HOME/bin/falcon metadata -list -type tags + +To display replication metrics from recipe based replication process and from feed replication. +Usage: +$FALCON_HOME/bin/falcon metadata -list -type replication_metrics -process/-feed <entity name> +Optional Args : -numResults <<value>> + +Example: +$FALCON_HOME/bin/falcon metadata -list -type replication_metrics -process hdfs-replication +$FALCON_HOME/bin/falcon metadata -list -type replication_metrics -feed fs-replication + + ---+++ Relations List all dimensions related to specified Dimension identified by dimension-type and dimension-name. http://git-wip-us.apache.org/repos/asf/falcon/blob/e945d2b4/prism/src/main/java/org/apache/falcon/resource/metadata/MetadataDiscoveryResource.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/metadata/MetadataDiscoveryResource.java b/prism/src/main/java/org/apache/falcon/resource/metadata/MetadataDiscoveryResource.java index 0f7701c..4cd9585 100644 --- a/prism/src/main/java/org/apache/falcon/resource/metadata/MetadataDiscoveryResource.java +++ b/prism/src/main/java/org/apache/falcon/resource/metadata/MetadataDiscoveryResource.java @@ -26,6 +26,8 @@ import com.tinkerpop.blueprints.util.io.graphson.GraphSONMode; import com.tinkerpop.blueprints.util.io.graphson.GraphSONUtility; import org.apache.commons.lang3.StringUtils; import org.apache.falcon.FalconWebException; +import org.apache.falcon.entity.EntityUtil; +import org.apache.falcon.job.ReplicationJobCountersList; import org.apache.falcon.metadata.RelationshipLabel; import org.apache.falcon.metadata.RelationshipProperty; import org.apache.falcon.metadata.RelationshipType; @@ -40,7 +42,11 @@ import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; import java.util.Iterator; +import java.util.List; /** * Jersey Resource for metadata operations. @@ -90,6 +96,58 @@ public class MetadataDiscoveryResource extends AbstractMetadataResource { } /** + * Get list of dimensions for the replication metrics. + * <p/> + * GET http://host/metadata/discovery/replication-metrics/list + * @param schedEntityType Type of the schedulable entity + * @param schedEntityName Name of the schedulable entity. + * @param numResults limit the number of metrics to return sorted in ascending order. + * @return List of dimensions that match requested type [and cluster]. + */ + @GET + @Path("/{name}/replication-metrics/list") + @Produces({MediaType.APPLICATION_JSON}) + public Response listReplicationMetricsDimensionValues(@PathParam("name") final String schedEntityName, + @QueryParam("type") final String schedEntityType, + @QueryParam("numResults") Integer numResults) { + JSONArray dimensionValues = new JSONArray(); + int resultsPerQuery = numResults == null ? 10 : numResults; + if (StringUtils.isNotBlank(schedEntityName)) { + try { + EntityUtil.getEntity(schedEntityType, schedEntityName); + } catch (Throwable e) { + throw FalconWebException.newAPIException(e, Response.Status.BAD_REQUEST); + } + dimensionValues = getReplicationEntityDimensionValues(schedEntityName, resultsPerQuery); + } + + try { + JSONObject response = new JSONObject(); + response.put(RESULTS, dimensionValues); + response.put(TOTAL_SIZE, dimensionValues.length()); + return Response.ok(response).build(); + } catch (JSONException e) { + throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR); + } + } + + + public JSONArray getReplicationEntityDimensionValues(final String schedEntityName, + final int resultsPerQuery) { + // Get schedule entity Vertex, get adjacent vertices of schedule entity Vertex that match dimension type. + Vertex schedEntityVertex = getVertexByName(schedEntityName); + if (schedEntityVertex == null) { + return new JSONArray(); + } + try { + Iterator<Edge> inEdges = schedEntityVertex.query().direction(Direction.IN).edges().iterator(); + return getAdjacentVerticesForVertexMetrics(inEdges, Direction.OUT, resultsPerQuery); + } catch (JSONException e) { + throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR); + } + } + + /** * Get relations of a dimension identified by type and name. * * GET http://host/metadata/discovery/dimension-type/dimension-name/relations @@ -148,11 +206,66 @@ public class MetadataDiscoveryResource extends AbstractMetadataResource { return adjVertices; } + private JSONArray getAdjacentVerticesForVertexMetrics(Iterator<Edge> edges, Direction direction, + int resultsPerQuery) throws JSONException { + JSONArray adjVertices = new JSONArray(); + Iterator<Edge> sortedEdge = sortEdgesById(edges, direction); + while (sortedEdge.hasNext() && resultsPerQuery!=0) { + Edge edge = sortedEdge.next(); + Vertex vertex = edge.getVertex(direction); + if (vertex.getProperty(ReplicationJobCountersList.BYTESCOPIED.getName()) != null) { + JSONObject vertexObject = new JSONObject(); + vertexObject.put(RelationshipProperty.NAME.getName(), + vertex.getProperty(RelationshipProperty.NAME.getName())); + vertexObject.put(ReplicationJobCountersList.TIMETAKEN.getName(), + vertex.getProperty(ReplicationJobCountersList.TIMETAKEN.getName())); + vertexObject.put(ReplicationJobCountersList.BYTESCOPIED.getName(), + vertex.getProperty(ReplicationJobCountersList.BYTESCOPIED.getName())); + vertexObject.put(ReplicationJobCountersList.COPY.getName(), + vertex.getProperty(ReplicationJobCountersList.COPY.getName())); + adjVertices.put(vertexObject); + + resultsPerQuery--; + } + } + + return adjVertices; + } + + Iterator<Edge> sortEdgesById(Iterator<Edge> edges, final Direction direction) { + List<Edge> edgeList = new ArrayList<Edge>(); + + while(edges.hasNext()) { + Edge e = edges.next(); + edgeList.add(e); + } + + Collections.sort(edgeList, new Comparator<Edge>() { + @Override + public int compare(Edge e1, Edge e2) { + long l1 = (long)e1.getVertex(direction).getId(); + long l2 = (long)e2.getVertex(direction).getId(); + + return l1 > l2 ? -1 : 1; + } + }); + + return edgeList.iterator(); + } + private String getVertexRelationshipType(Vertex vertex) { String type = vertex.getProperty(RelationshipProperty.TYPE.getName()); return RelationshipType.fromString(type).toString(); } + private Vertex getVertexByName(String name) { + Iterator<Vertex> vertexIterator = getGraph().query() + .has(RelationshipProperty.NAME.getName(), name) + .vertices().iterator(); + + return vertexIterator.hasNext() ? vertexIterator.next() : null; + } + private Vertex getVertex(String name, String type) { Iterator<Vertex> vertexIterator = getGraph().query() .has(RelationshipProperty.TYPE.getName(), type) http://git-wip-us.apache.org/repos/asf/falcon/blob/e945d2b4/prism/src/test/java/org/apache/falcon/resource/metadata/MetadataDiscoveryResourceTest.java ---------------------------------------------------------------------- diff --git a/prism/src/test/java/org/apache/falcon/resource/metadata/MetadataDiscoveryResourceTest.java b/prism/src/test/java/org/apache/falcon/resource/metadata/MetadataDiscoveryResourceTest.java index 84ada9a..bbe1c81 100644 --- a/prism/src/test/java/org/apache/falcon/resource/metadata/MetadataDiscoveryResourceTest.java +++ b/prism/src/test/java/org/apache/falcon/resource/metadata/MetadataDiscoveryResourceTest.java @@ -19,6 +19,7 @@ package org.apache.falcon.resource.metadata; import org.apache.falcon.FalconWebException; +import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.metadata.RelationshipType; import org.json.simple.JSONValue; import org.testng.Assert; @@ -150,6 +151,20 @@ public class MetadataDiscoveryResourceTest { Assert.assertEquals(Integer.parseInt(results.get(MetadataDiscoveryResource.TOTAL_SIZE).toString()), 0); } + @Test + public void testListDimensionsMetrics() throws Exception { + MetadataDiscoveryResource resource = new MetadataDiscoveryResource(); + Response response = resource.listReplicationMetricsDimensionValues("sample-process", + EntityType.PROCESS.name(), 5); + Map results = (Map) JSONValue.parse(response.getEntity().toString()); + Assert.assertEquals(response.getStatus(), Response.Status.OK.getStatusCode()); + List metrics = (List) results.get(MetadataDiscoveryResource.RESULTS); + Assert.assertEquals(metrics.size(), 1); + Assert.assertTrue(metrics.get(0).toString().contains("BYTESCOPIED")); + Assert.assertTrue(metrics.get(0).toString().contains("COPY")); + } + + @Test(expectedExceptions = FalconWebException.class) public void testListInvalidDimensionType() throws Exception { MetadataDiscoveryResource resource = new MetadataDiscoveryResource(); http://git-wip-us.apache.org/repos/asf/falcon/blob/e945d2b4/prism/src/test/java/org/apache/falcon/resource/metadata/MetadataTestContext.java ---------------------------------------------------------------------- diff --git a/prism/src/test/java/org/apache/falcon/resource/metadata/MetadataTestContext.java b/prism/src/test/java/org/apache/falcon/resource/metadata/MetadataTestContext.java index 05cc2e9..a382d85 100644 --- a/prism/src/test/java/org/apache/falcon/resource/metadata/MetadataTestContext.java +++ b/prism/src/test/java/org/apache/falcon/resource/metadata/MetadataTestContext.java @@ -32,15 +32,20 @@ import org.apache.falcon.entity.v0.feed.Location; import org.apache.falcon.entity.v0.feed.LocationType; import org.apache.falcon.entity.v0.feed.Locations; import org.apache.falcon.entity.v0.process.EngineType; +import org.apache.falcon.hadoop.HadoopClientFactory; import org.apache.falcon.metadata.MetadataMappingService; +import org.apache.falcon.metadata.MetadataMappingServiceTest; import org.apache.falcon.security.CurrentUser; import org.apache.falcon.service.Services; import org.apache.falcon.util.StartupProperties; import org.apache.falcon.workflow.WorkflowExecutionArgs; import org.apache.falcon.workflow.WorkflowExecutionContext; import org.apache.falcon.workflow.WorkflowJobEndNotificationService; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.testng.Assert; +import java.io.OutputStream; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -194,6 +199,7 @@ public class MetadataTestContext { } public void addInstance() throws Exception { + createJobCountersFileForTest(); WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(), WorkflowExecutionContext.Type.POST_PROCESSING); service.onSuccess(context); @@ -220,6 +226,20 @@ public class MetadataTestContext { } } + private void createJobCountersFileForTest() throws Exception { + Path counterFile = new Path(LOGS_DIR, "counter.txt"); + OutputStream out = null; + try { + FileSystem fs = HadoopClientFactory.get().createFalconFileSystem( + new Path(LOGS_DIR).toUri()); + out = fs.create(counterFile); + out.write((MetadataMappingServiceTest.COUNTERS).getBytes()); + out.flush(); + } finally { + out.close(); + } + } + private static String[] getTestMessageArgs() { return new String[]{ "-" + WorkflowExecutionArgs.CLUSTER_NAME.getName(), CLUSTER_ENTITY_NAME,
