Repository: falcon Updated Branches: refs/heads/master 5e270741b -> f4bb375e5
FALCON-1377 Add tests in falcon for the Triage API. Contributed by Karishma Gulati. Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/f4bb375e Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/f4bb375e Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/f4bb375e Branch: refs/heads/master Commit: f4bb375e5aecff82ce2c2177bb4f08d0352d4289 Parents: 5e27074 Author: Pragya <[email protected]> Authored: Thu Dec 24 09:31:15 2015 +0000 Committer: Pragya <[email protected]> Committed: Thu Dec 24 09:31:15 2015 +0000 ---------------------------------------------------------------------- falcon-regression/CHANGES.txt | 2 + .../core/enumsAndConstants/ResponseErrors.java | 3 +- .../falcon/regression/core/util/AssertUtil.java | 18 +- .../regression/core/util/EntityLineageUtil.java | 37 +- .../triage/TriageAPISingleColoTest.java | 447 +++++++++++++++++++ 5 files changed, 496 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bb375e/falcon-regression/CHANGES.txt ---------------------------------------------------------------------- diff --git a/falcon-regression/CHANGES.txt b/falcon-regression/CHANGES.txt index d285661..4d58035 100644 --- a/falcon-regression/CHANGES.txt +++ b/falcon-regression/CHANGES.txt @@ -5,6 +5,8 @@ Trunk (Unreleased) INCOMPATIBLE CHANGES NEW FEATURES + FALCON-1377 Add tests in Falcon for the Triage API(Karishma Gulati via Pragya Mittal) + FALCON-1546 Add ProcessUpdateTest, PipelineInstanceDependencyTest and other tests and test fixes (Raghav Gautam and Paul Isaychuk via Paul Isaychuk) http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bb375e/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/enumsAndConstants/ResponseErrors.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/enumsAndConstants/ResponseErrors.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/enumsAndConstants/ResponseErrors.java index 921a303..b16edf6 100644 --- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/enumsAndConstants/ResponseErrors.java +++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/enumsAndConstants/ResponseErrors.java @@ -27,7 +27,8 @@ public enum ResponseErrors { PROCESS_INVALID_RANGE("is not in validity range of process"), PROCESS_INSTANCE_FAULT("is not a valid instance time on cluster"), FEED_INVALID_RANGE("is not in validity range for Feed"), - FEED_INSTANCE_FAULT("is not a valid instance for the feed"); + FEED_INSTANCE_FAULT("is not a valid instance for the feed"), + INVALID_INSTANCE_TIME("not a valid instance"); private String error; http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bb375e/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/AssertUtil.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/AssertUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/AssertUtil.java index d8df0fb..9d3b802 100644 --- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/AssertUtil.java +++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/AssertUtil.java @@ -96,7 +96,7 @@ public final class AssertUtil { LOGGER.info("elements = " + elements); } Assert.assertEquals(elements.size(), expectedSize, - "Size of expected and actual list don't match."); + "Size of expected and actual list don't match."); } /** @@ -218,7 +218,7 @@ public final class AssertUtil { Assert.assertFalse(execResult.hasSuceeded(), "Unexpectedly succeeded execResult: " + execResult); Assert.assertTrue((execResult.getError() + execResult.getOutput()).contains(expectedMessage), - "Expected error: " + expectedMessage + " in execResult: " + execResult); + "Expected error: " + expectedMessage + " in execResult: " + execResult); } /** @@ -258,13 +258,25 @@ public final class AssertUtil { */ public static void assertFailed(ServiceResponse response) throws JAXBException { Assert.assertNotEquals(response.getMessage(), "null", - "response message should not be null"); + "response message should not be null"); Assert.assertEquals(Util.parseResponse(response).getStatus(), APIResult.Status.FAILED); Assert.assertEquals(response.getCode(), 400); } /** + * Checks that Instance/Triage result status is FAILED. + * + * @param response APIResult response + */ + public static void assertFailed(APIResult response) { + Assert.assertNotEquals(response.getMessage(), "null", + "response message should not be null"); + Assert.assertEquals(response.getStatus(), APIResult.Status.FAILED, + "Status should be FAILED. Message: " + response.getMessage()); + } + + /** * Checks that ServiceResponse status is status FAILED with status code 403. * * @param response ServiceResponse http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bb375e/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/EntityLineageUtil.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/EntityLineageUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/EntityLineageUtil.java index 2df474d..3b6314f 100644 --- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/EntityLineageUtil.java +++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/EntityLineageUtil.java @@ -18,7 +18,9 @@ package org.apache.falcon.regression.core.util; +import org.apache.falcon.regression.core.enumsAndConstants.ResponseErrors; import org.apache.falcon.resource.LineageGraphResult; +import org.apache.falcon.resource.TriageResult; import org.apache.log4j.Logger; import org.joda.time.DateTime; import org.testng.Assert; @@ -59,17 +61,38 @@ public final class EntityLineageUtil{ LineageGraphResult.Edge[] expectedEdgeArray) { String[] actualVertices; LineageGraphResult.Edge[] actualEdgeArray; - actualVertices = lineageGraphResult.getVertices(); - actualEdgeArray = lineageGraphResult.getEdges(); + Set<String> actualVerticesSet = new HashSet<>(); + Set<LineageGraphResult.Edge> actualEdgeSet = new HashSet<>(); + + try { + actualVertices = lineageGraphResult.getVertices(); + actualVerticesSet = new HashSet<>(Arrays.asList(actualVertices)); + } catch (NullPointerException e) { + Assert.assertEquals(expectedVertices.length, 0); + } + try { + actualEdgeArray = lineageGraphResult.getEdges(); + actualEdgeSet = new HashSet<>(Arrays.asList(actualEdgeArray)); + } catch (NullPointerException e) { + Assert.assertEquals(expectedEdgeArray.length, 0); + } Set<LineageGraphResult.Edge> expectedEdgeSet = new HashSet<>(Arrays.asList(expectedEdgeArray)); - Set<LineageGraphResult.Edge> actualEdgeSet = new HashSet<>(Arrays.asList(actualEdgeArray)); - Set<String> expectedVerticesSet = new HashSet<>(Arrays.asList(expectedVertices)); - Set<String> actualVerticesSet = new HashSet<>(Arrays.asList(actualVertices)); - Assert.assertEquals(expectedEdgeSet, actualEdgeSet, "Edges dont match"); - Assert.assertEquals(expectedVerticesSet, actualVerticesSet, "Vertices dont match"); + Assert.assertEquals(actualEdgeSet, expectedEdgeSet, "Edges dont match"); + Assert.assertEquals(actualVerticesSet, expectedVerticesSet, "Vertices dont match"); + } + + /** + * Validates that failed response contains specific error message. + * @param triageResult response + * @param error expected error + */ + public static void validateError(TriageResult triageResult, ResponseErrors error) { + AssertUtil.assertFailed(triageResult); + Assert.assertTrue(triageResult.getMessage().contains(error.getError()), + "Error should contain '" + error + "'"); } /** http://git-wip-us.apache.org/repos/asf/falcon/blob/f4bb375e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/triage/TriageAPISingleColoTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/triage/TriageAPISingleColoTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/triage/TriageAPISingleColoTest.java new file mode 100644 index 0000000..70d1e17 --- /dev/null +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/triage/TriageAPISingleColoTest.java @@ -0,0 +1,447 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.regression.triage; + +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.Frequency; +import org.apache.falcon.entity.v0.SchemaHelper; +import org.apache.falcon.regression.Entities.ProcessMerlin; +import org.apache.falcon.regression.core.bundle.Bundle; +import org.apache.falcon.regression.core.enumsAndConstants.ResponseErrors; +import org.apache.falcon.regression.core.helpers.ColoHelper; +import org.apache.falcon.regression.core.util.InstanceUtil; +import org.apache.falcon.regression.core.util.BundleUtil; +import org.apache.falcon.regression.core.util.TimeUtil; +import org.apache.falcon.regression.core.util.OSUtil; +import org.apache.falcon.regression.core.util.OozieUtil; +import org.apache.falcon.regression.core.util.AssertUtil; +import org.apache.falcon.regression.core.util.EntityLineageUtil; +import org.apache.falcon.regression.testHelper.BaseTestClass; +import org.apache.falcon.resource.LineageGraphResult.Edge; +import org.apache.falcon.resource.SchedulableEntityInstance; +import org.apache.falcon.resource.TriageResult; +import org.apache.oozie.client.CoordinatorAction; +import org.apache.oozie.client.OozieClient; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Test Class for Testing the Triage API on a single colo corresponding to FALCON-1377. + */ +@Test(groups = {"singleCluster"}) +public class TriageAPISingleColoTest extends BaseTestClass { + private ColoHelper cluster = servers.get(0); + private OozieClient clusterOC = serverOC.get(0); + private String baseTestHDFSDir = cleanAndGetTestDir(); + private String aggregateWorkflowDir = baseTestHDFSDir + "/aggregator"; + private String outputFeedName, processName, clusterName; + private String startTime, endTime; + private String feedInputPath = baseTestHDFSDir + "/input" + MINUTE_DATE_PATTERN; + private String feedOutputPath = baseTestHDFSDir + "/output-data" + MINUTE_DATE_PATTERN; + private List<String> expectedVertexList = new ArrayList<>(); + private List<Edge> expectedEdgeList = new ArrayList<>(); + private TriageResult responseTriage; + + @BeforeMethod(alwaysRun = true) + public void setUp() throws Exception { + startTime = "2015-01-02T00:00Z"; + endTime = "2015-01-02T00:03Z"; + bundles[0] = BundleUtil.readELBundle(); + bundles[0].generateUniqueBundle(this); + bundles[0] = new Bundle(bundles[0], cluster); + bundles[0].setProcessWorkflow(aggregateWorkflowDir); + bundles[0].setProcessPeriodicity(5, Frequency.TimeUnit.minutes); + bundles[0].setProcessValidity(startTime, endTime); + bundles[0].setProcessConcurrency(1); + bundles[0].setInputFeedDataPath(feedInputPath); + bundles[0].setOutputFeedLocationData(feedOutputPath); + + processName = bundles[0].getProcessName(); + clusterName = bundles[0].getClusterNames().get(0); + outputFeedName = bundles[0].getOutputFeedNameFromBundle(); + + uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE); + + expectedEdgeList = new ArrayList<>(); + expectedVertexList = new ArrayList<>(); + } + + @AfterMethod(alwaysRun = true) + public void tearDown() throws IOException { + removeTestClassEntities(); + cleanTestsDirs(); + } + + /** + * Creates expected output based on entity type, and then calls the validation function + * to compare expected and actual graphs. + * @param entityType type of entity, whether process or feed + */ + private void createExpectedOutput(EntityType entityType) throws Exception{ + String finalInstanceTag; + + AssertUtil.assertSucceeded(responseTriage); + Assert.assertEquals(responseTriage.getTriageGraphs().length, 1); + String inputVertex1 = createVertex(bundles[0].getInputFeedNameFromBundle(), + TimeUtil.addMinsToTime(startTime, -20), EntityType.FEED, "Input[MISSING]"); + expectedVertexList.add(inputVertex1); + String inputVertex2 = createVertex(bundles[0].getInputFeedNameFromBundle(), startTime, + EntityType.FEED, "Input[MISSING]"); + expectedVertexList.add(inputVertex2); + if (entityType.equals(EntityType.PROCESS)) { + finalInstanceTag = "[WAITING]"; + } else { + finalInstanceTag = "Output[WAITING]"; + } + String processVertex = createVertex(processName, startTime, EntityType.PROCESS, finalInstanceTag); + expectedVertexList.add(processVertex); + if (entityType.equals(EntityType.FEED)) { + String outputVertex1 = createVertex(outputFeedName, startTime, + EntityType.FEED, "[MISSING]"); + expectedVertexList.add(outputVertex1); + expectedEdgeList.add(new Edge(processVertex, outputVertex1, "produces")); + } + expectedEdgeList.add(new Edge(inputVertex1, processVertex, "consumed by")); + expectedEdgeList.add(new Edge(inputVertex2, processVertex, "consumed by")); + + EntityLineageUtil.validateLineageGraphResult(responseTriage.getTriageGraphs()[0], + expectedVertexList.toArray(new String[expectedVertexList.size()]), + expectedEdgeList.toArray(new Edge[expectedEdgeList.size()])); + } + + /** + * Single process with one input and one output, of which one instance is in waiting, and request for triage + * on that instance on the server. There should be no output feed and the process instance is the terminal + * instance. Upon triaging on server on an output instance of the feed, an additional vertex and edge should + * be seen for this feed instance. + * + * @throws Exception + */ + @Test(dataProvider = "getParameters", groups = "embedded") + public void triageTestServer(EntityType entityType) throws Exception { + bundles[0].submitFeedsScheduleProcess(); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); + if (entityType.equals(EntityType.FEED)) { + responseTriage = cluster.getFeedHelper().getInstanceTriage(outputFeedName, + "?start=" + startTime); + } else { + responseTriage = cluster.getProcessHelper().getInstanceTriage(processName, + "?start=" + startTime); + } + //Creating expected vertices and graphs + createExpectedOutput(entityType); + } + + /** + * Single process with one input and one output, of which one instance is in waiting, and request for triage + * on that instance on the server. There should be no output feed and the process instance is the terminal + * instance. Upon triaging on prism on an output instance of the feed, an additional vertex and edge should + * be seen for this feed instance. + * + * @throws Exception + */ + @Test(dataProvider = "getParameters", groups = "distributed") + public void triageTestPrism(EntityType entityType) throws Exception { + bundles[0].submitFeedsScheduleProcess(); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); + if (entityType.equals(EntityType.FEED)) { + responseTriage = prism.getFeedHelper().getInstanceTriage(outputFeedName, + "?start=" + startTime); + } else { + responseTriage = prism.getProcessHelper().getInstanceTriage(processName, + "?start=" + startTime); + } + //Creating expected vertices and graphs + createExpectedOutput(entityType); + } + + /** + * Single process with one input and one output, but we triage on a non-existent feed/process + * instance on the server. Appropriate error should be thrown. + * + * @throws Exception + */ + @Test(dataProvider = "getParameters", groups = "embedded") + public void invalidInstanceOnServerTest(EntityType entityType) throws Exception { + bundles[0].submitFeedsScheduleProcess(); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); + if (entityType.equals(EntityType.FEED)) { + responseTriage = cluster.getFeedHelper().getInstanceTriage(outputFeedName, + "?start=" + TimeUtil.addMinsToTime(startTime, 2)); + } else { + responseTriage = cluster.getProcessHelper().getInstanceTriage(processName, + "?start=" + TimeUtil.addMinsToTime(startTime, 2)); + } + EntityLineageUtil.validateError(responseTriage, ResponseErrors.INVALID_INSTANCE_TIME); + } + + /** + * Single process with one input and one output, but we triage on a non-existent feed/process + * instance on the prism. Appropriate error should be thrown. + * + * @throws Exception + */ + @Test(dataProvider = "getParameters", groups = "distributed") + public void invalidInstanceOnPrismTest(EntityType entityType) throws Exception { + bundles[0].submitFeedsScheduleProcess(); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); + if (entityType.equals(EntityType.FEED)) { + responseTriage = prism.getFeedHelper().getInstanceTriage(outputFeedName, + "?start=" + TimeUtil.addMinsToTime(startTime, 2)); + } else { + responseTriage = prism.getProcessHelper().getInstanceTriage(processName, + "?start=" + TimeUtil.addMinsToTime(startTime, 2)); + } + EntityLineageUtil.validateError(responseTriage, ResponseErrors.INVALID_INSTANCE_TIME); + } + + /** + * Submit and Schedule a process on one cluster via prism, and triage on an instance of the process on a different + * cluster. Appropriate error should be thrown. + * + * @throws Exception + */ + @Test(groups = "embedded") + public void processTriageOnServerWhereProcessDoesNotExistTest() throws Exception { + responseTriage = servers.get(1).getProcessHelper().getInstanceTriage(processName, + "?start=" + startTime); + EntityLineageUtil.validateError(responseTriage, ResponseErrors.PROCESS_NOT_FOUND); + } + + /** + * Single process with one input and one output which succeeds. Triage on server, on a succeeded instance, + * and we should get just one vertex in the graph, without any edges. + * + * @throws Exception + */ + @Test(groups = "embedded") + public void processInstanceSucceededTriageOnServerTest() throws Exception { + bundles[0].submitFeedsScheduleProcess(); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); + OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); + InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1, + CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS, 5); + responseTriage = cluster.getProcessHelper().getInstanceTriage(processName, + "?start=" + startTime); + AssertUtil.assertSucceeded(responseTriage); + Assert.assertEquals(responseTriage.getTriageGraphs().length, 1); + + //There'll be just one process instance vertex and no edges + Assert.assertEquals(responseTriage.getTriageGraphs()[0].getVertices().length, 1); + String processVertex = createVertex(processName, startTime, EntityType.PROCESS, "[SUCCEEDED]"); + expectedVertexList.add(processVertex); + + EntityLineageUtil.validateLineageGraphResult(responseTriage.getTriageGraphs()[0], + expectedVertexList.toArray(new String[expectedVertexList.size()]), + expectedEdgeList.toArray(new Edge[expectedEdgeList.size()])); + } + + /** + * Single process with one input and one output which succeeds. Triage on prism, on a succeeded instance, + * and we should get just one vertex in the graph, without any edges. + * + * @throws Exception + */ + @Test(groups = "distributed") + public void processInstanceSucceededTriageOnPrismTest() throws Exception { + bundles[0].submitFeedsScheduleProcess(); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); + OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); + InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1, + CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS, 5); + responseTriage = prism.getProcessHelper().getInstanceTriage(processName, + "?start=" + startTime); + AssertUtil.assertSucceeded(responseTriage); + Assert.assertEquals(responseTriage.getTriageGraphs().length, 1); + + //There'll be just one process instance vertex and no edges + Assert.assertEquals(responseTriage.getTriageGraphs()[0].getVertices().length, 1); + String processVertex = createVertex(processName, startTime, EntityType.PROCESS, + "[SUCCEEDED]"); + expectedVertexList.add(processVertex); + + EntityLineageUtil.validateLineageGraphResult(responseTriage.getTriageGraphs()[0], + expectedVertexList.toArray(new String[expectedVertexList.size()]), + expectedEdgeList.toArray(new Edge[expectedEdgeList.size()])); + } + + /** + * Single process one instance of whose output feed is fed as input to the process. This is to test the closed + * loop condition, in case we triage on a process instance, or on an output feed instance. + * + * @throws Exception + */ + @Test(groups = "distributed") + public void cycleTest() throws Exception { + //Setting an instance of the output feed of a process as input to the process + bundles[0].addProcessInput("inputData2", outputFeedName); + ProcessMerlin processObj = new ProcessMerlin(bundles[0].getProcessData()); + processObj.getInputs().getInputs().get(1).setStart("now(-1,0)"); + processObj.getInputs().getInputs().get(1).setEnd("now(-1,0)"); + bundles[0].setProcessData(processObj.toString()); + bundles[0].submitFeedsScheduleProcess(); + + responseTriage = prism.getFeedHelper().getInstanceTriage(outputFeedName, + "?start=" + startTime); + AssertUtil.assertSucceeded(responseTriage); + Assert.assertEquals(responseTriage.getTriageGraphs().length, 1); + + //There'll be four feed instance vertices and one process instance vertex + String inputVertex1 = createVertex(bundles[0].getInputFeedNameFromBundle(), + TimeUtil.addMinsToTime(startTime, -20), EntityType.FEED, "Input[MISSING]"); + expectedVertexList.add(inputVertex1); + String inputVertex2 = createVertex(bundles[0].getInputFeedNameFromBundle(), startTime, + EntityType.FEED, "Input[MISSING]"); + expectedVertexList.add(inputVertex2); + String inputVertex3 = createVertex(outputFeedName, TimeUtil.addMinsToTime(startTime, -60), + EntityType.FEED, "Input[MISSING]"); + expectedVertexList.add(inputVertex3); + String processVertex = createVertex(processName, startTime, EntityType.PROCESS, + "Output[WAITING]"); + expectedVertexList.add(processVertex); + String outputVertex1 = createVertex(outputFeedName, startTime, + EntityType.FEED, "[MISSING]"); + expectedVertexList.add(outputVertex1); + expectedEdgeList.add(new Edge(inputVertex1, processVertex, "consumed by")); + expectedEdgeList.add(new Edge(inputVertex2, processVertex, "consumed by")); + expectedEdgeList.add(new Edge(inputVertex3, processVertex, "consumed by")); + expectedEdgeList.add(new Edge(processVertex, outputVertex1, "produces")); + + EntityLineageUtil.validateLineageGraphResult(responseTriage.getTriageGraphs()[0], + expectedVertexList.toArray(new String[expectedVertexList.size()]), + expectedEdgeList.toArray(new Edge[expectedEdgeList.size()])); + + responseTriage = prism.getProcessHelper().getInstanceTriage(processName, "?start=" + startTime); + AssertUtil.assertSucceeded(responseTriage); + Assert.assertEquals(responseTriage.getTriageGraphs().length, 1); + + //There'll be three feed instance vertices and one process instance vertex + expectedVertexList = new ArrayList<>(); + expectedEdgeList = new ArrayList<>(); + inputVertex1 = createVertex(bundles[0].getInputFeedNameFromBundle(), TimeUtil.addMinsToTime(startTime, -20), + EntityType.FEED, "Input[MISSING]"); + expectedVertexList.add(inputVertex1); + inputVertex2 = createVertex(bundles[0].getInputFeedNameFromBundle(), startTime, + EntityType.FEED, "Input[MISSING]"); + expectedVertexList.add(inputVertex2); + inputVertex3 = createVertex(outputFeedName, TimeUtil.addMinsToTime(startTime, -60), + EntityType.FEED, "Input[MISSING]"); + expectedVertexList.add(inputVertex3); + processVertex = createVertex(processName, startTime, EntityType.PROCESS, + "[WAITING]"); + expectedVertexList.add(processVertex); + expectedEdgeList.add(new Edge(inputVertex1, processVertex, "consumed by")); + expectedEdgeList.add(new Edge(inputVertex2, processVertex, "consumed by")); + expectedEdgeList.add(new Edge(inputVertex3, processVertex, "consumed by")); + + EntityLineageUtil.validateLineageGraphResult(responseTriage.getTriageGraphs()[0], + expectedVertexList.toArray(new String[expectedVertexList.size()]), + expectedEdgeList.toArray(new Edge[expectedEdgeList.size()])); + } + + /** + * Two Dependent processes, where one consumes the output of the other. Triage on the output of the + * second process. + * + * @throws Exception + */ + @Test(groups = "distributed") + public void twoDependentProcessesTest() throws Exception { + bundles[0].submitFeedsScheduleProcess(); //this process will stay in waiting + + //There'll be three feed instance vertices (2 input, 1 output) and two process instance vertices + String inputVertex1 = createVertex(bundles[0].getInputFeedNameFromBundle(), + TimeUtil.addMinsToTime(startTime, -20), EntityType.FEED, "Input[MISSING]"); + expectedVertexList.add(inputVertex1); + String inputVertex2 = createVertex(bundles[0].getInputFeedNameFromBundle(), startTime, + EntityType.FEED, "Input[MISSING]"); + expectedVertexList.add(inputVertex2); + String processVertex1 = createVertex(processName, startTime, EntityType.PROCESS, + "Output[WAITING]"); + expectedVertexList.add(processVertex1); + String outputVertex1 = createVertex(outputFeedName, startTime, + EntityType.FEED, "Input[MISSING]"); + expectedVertexList.add(outputVertex1); + + //preparing second process + bundles[0].setProcessValidity(TimeUtil.addMinsToTime(startTime, 60), TimeUtil.addMinsToTime(startTime, 61)); + ProcessMerlin processObj = new ProcessMerlin(bundles[0].getProcessData()); + processObj.getOutputs().getOutputs().get(0).setFeed(processObj.getInputs().getInputs().get(0).getFeed()); + //The input of this process is the output of the previous process + processObj.getInputs().getInputs().get(0).setFeed(outputFeedName); + processObj.setName("ConsumerOfFirstProcessOutput"); + bundles[0].setProcessData(processObj.toString()); + bundles[0].setProcessInputStartEnd("now(-1,0)", "now(-1,0)"); + bundles[0].setProcessWorkflow(aggregateWorkflowDir); + bundles[0].setProcessPeriodicity(5, Frequency.TimeUnit.minutes); + bundles[0].submitFeedsScheduleProcess(); + responseTriage = prism.getFeedHelper().getInstanceTriage(bundles[0].getOutputFeedNameFromBundle(), + "?start=" + TimeUtil.addMinsToTime(startTime, 60)); + AssertUtil.assertSucceeded(responseTriage); + Assert.assertEquals(responseTriage.getTriageGraphs().length, 1); + + //Adding one more process vertex and output feed vertex + String processVertex2 = createVertex("ConsumerOfFirstProcessOutput", TimeUtil.addMinsToTime(startTime, 60), + EntityType.PROCESS, "Output[WAITING]"); + expectedVertexList.add(processVertex2); + String outputVertex2 = createVertex(bundles[0].getOutputFeedNameFromBundle(), + TimeUtil.addMinsToTime(startTime, 60), EntityType.FEED, "[MISSING]"); + expectedVertexList.add(outputVertex2); + + expectedEdgeList.add(new Edge(inputVertex1, processVertex1, "consumed by")); + expectedEdgeList.add(new Edge(inputVertex2, processVertex1, "consumed by")); + expectedEdgeList.add(new Edge(processVertex1, outputVertex1, "produces")); + expectedEdgeList.add(new Edge(outputVertex1, processVertex2, "consumed by")); + expectedEdgeList.add(new Edge(processVertex2, outputVertex2, "produces")); + + EntityLineageUtil.validateLineageGraphResult(responseTriage.getTriageGraphs()[0], + expectedVertexList.toArray(new String[expectedVertexList.size()]), + expectedEdgeList.toArray(new Edge[expectedEdgeList.size()])); + } + + /** + * Data Provider enables the same test to run for triage on entities feed and process. + */ + @DataProvider + private Object[][] getParameters() { + return new Object[][]{{EntityType.FEED}, {EntityType.PROCESS}}; + } + + /** + * Creates a vertex out of the fields provided. + * @param name name of process/feed + * @param instanceTime instance time + * @param entityType if entity is process or feed + * @param tags status of the feed + */ + private String createVertex(String name, String instanceTime, EntityType entityType, String tags) { + SchedulableEntityInstance vertex = new SchedulableEntityInstance(name, clusterName, + SchemaHelper.parseDateUTC(instanceTime), entityType); + vertex.setTags(tags); + return vertex.toString(); + } +}
