Repository: falcon Updated Branches: refs/heads/master b606ffcf5 -> f64e6c162
FALCON-1321 Add Entity Lineage Test. Contributed by Pragya Mittal. Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/f64e6c16 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/f64e6c16 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/f64e6c16 Branch: refs/heads/master Commit: f64e6c162945fcf46e83e949944437c09fd5d166 Parents: b606ffc Author: Ajay Yadava <[email protected]> Authored: Mon Aug 3 15:20:14 2015 +0530 Committer: Ajay Yadava <[email protected]> Committed: Mon Aug 3 15:20:14 2015 +0530 ---------------------------------------------------------------------- falcon-regression/CHANGES.txt | 2 + .../regression/Entities/ProcessMerlin.java | 48 ++++ .../helpers/entity/AbstractEntityHelper.java | 17 ++ .../core/response/ServiceResponse.java | 12 + .../regression/core/util/EntityLineageUtil.java | 65 ++++++ .../falcon/regression/core/util/Util.java | 21 ++ .../org/apache/falcon/request/RequestKeys.java | 1 + .../regression/lineage/EntityLineageTest.java | 221 +++++++++++++++++++ 8 files changed, 387 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/f64e6c16/falcon-regression/CHANGES.txt ---------------------------------------------------------------------- diff --git a/falcon-regression/CHANGES.txt b/falcon-regression/CHANGES.txt index 73ce75b..1a300d5 100644 --- a/falcon-regression/CHANGES.txt +++ b/falcon-regression/CHANGES.txt @@ -5,6 +5,8 @@ Trunk (Unreleased) INCOMPATIBLE CHANGES NEW FEATURES + FALCON-1321 Add Entity Lineage Test (Pragya Mittal via Ajay Yadava) + FALCON-1319 Contribute HiveDr, Mirror tests and some test fixes (Namit Maheshwari, Paul Isaychuk, Raghav Kumar Gautam & Ruslan Ostafiychuk via Raghav Kumar Gautam) http://git-wip-us.apache.org/repos/asf/falcon/blob/f64e6c16/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ProcessMerlin.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ProcessMerlin.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ProcessMerlin.java index ae5c70c..b905bee 100644 --- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ProcessMerlin.java +++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ProcessMerlin.java @@ -626,6 +626,54 @@ public class ProcessMerlin extends Process { draft.setWorkflow(null, null, null); return draft; } + + /** + * Replaces old input by new input. + */ + public void resetInputFeed(String inputName, String feedName) { + Input in1 = getInputs().getInputs().get(0); + getInputs().getInputs().clear(); + Input in2 = new Input(); + in2.setEnd(in1.getEnd()); + in2.setFeed(feedName); + in2.setName(inputName); + in2.setPartition(in1.getPartition()); + in2.setStart(in1.getStart()); + in2.setOptional(in1.isOptional()); + getInputs().getInputs().add(in2); + } + + /** + * Replaces old output by new output. + */ + public void resetOutputFeed(String outputName, String feedName) { + Output out1 = getOutputs().getOutputs().get(0); + getOutputs().getOutputs().clear(); + Output out2 = new Output(); + out2.setFeed(feedName); + out2.setName(outputName); + out2.setInstance(out1.getInstance()); + getOutputs().getOutputs().add(out2); + } + + /** + * Adds array of feeds as input. + */ + public void addInputFeeds(String[] ipFeed) { + for(int i=0; i<ipFeed.length; i++){ + addInputFeed(ipFeed[i], ipFeed[i]); + } + } + + /** + * Adds array of feeds as output. + */ + public void addOutputFeeds(String[] opFeed) { + for(int i=0; i<opFeed.length; i++){ + addOutputFeed(opFeed[i], opFeed[i]); + } + } + } http://git-wip-us.apache.org/repos/asf/falcon/blob/f64e6c16/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/entity/AbstractEntityHelper.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/entity/AbstractEntityHelper.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/entity/AbstractEntityHelper.java index 7b8d111..2baa35f 100644 --- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/entity/AbstractEntityHelper.java +++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/entity/AbstractEntityHelper.java @@ -669,4 +669,21 @@ public abstract class AbstractEntityHelper { entityName + colo); return Util.sendRequest(url, "post", data, user); } + + + /** + * Retrieves entities lineage. + * @param params list of optional parameters + * @return entity lineage for the given pipeline. + */ + public ServiceResponse getEntityLineage(String params) + throws URISyntaxException, AuthenticationException, InterruptedException, IOException { + + String url = createUrl(this.hostname + URLS.ENTITY_LINEAGE.getValue(), colo); + if (StringUtils.isNotEmpty(params)){ + url += colo.isEmpty() ? "?" + params : "&" + params; + } + return Util.sendRequestLineage(createUrl(url), "get", null, null); + } + } http://git-wip-us.apache.org/repos/asf/falcon/blob/f64e6c16/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/response/ServiceResponse.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/response/ServiceResponse.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/response/ServiceResponse.java index 175bb98..55e862c 100644 --- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/response/ServiceResponse.java +++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/response/ServiceResponse.java @@ -18,11 +18,13 @@ package org.apache.falcon.regression.core.response; +import com.google.gson.GsonBuilder; import org.apache.commons.io.IOUtils; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.falcon.regression.core.util.Util; import org.apache.falcon.resource.EntityList; import org.apache.falcon.resource.EntitySummaryResult; +import org.apache.falcon.resource.LineageGraphResult; import org.apache.http.HttpResponse; import org.apache.log4j.Logger; @@ -109,4 +111,14 @@ public class ServiceResponse { return null; } } + + /** + * Retrieves LineageGraphResult from a message if possible. + * @return LineageGraphResult + */ + public LineageGraphResult getLineageGraphResult() { + LineageGraphResult lineageGraphResult = new GsonBuilder().create().fromJson(message, LineageGraphResult.class); + return lineageGraphResult; + } + } http://git-wip-us.apache.org/repos/asf/falcon/blob/f64e6c16/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/EntityLineageUtil.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/EntityLineageUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/EntityLineageUtil.java new file mode 100644 index 0000000..fc42cf5 --- /dev/null +++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/EntityLineageUtil.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.regression.core.util; + +import org.apache.falcon.resource.LineageGraphResult; +import org.apache.log4j.Logger; +import org.testng.Assert; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + + +/** + *Util function related to entity lineage. + */ +public final class EntityLineageUtil{ + + private static final Logger LOGGER = Logger.getLogger(EntityLineageUtil.class); + + private EntityLineageUtil() { + throw new AssertionError("Instantiating utility class..."); + } + + /** + * Validates entity lineage results. + * @param lineageGraphResult entity lineage result + * @param expectedVertices array of expected vertices + * @param expectedEdgeArray array of expected edges + */ + public static void validateLineageGraphResult(LineageGraphResult lineageGraphResult, String[] expectedVertices, + LineageGraphResult.Edge[] expectedEdgeArray) { + String[] actualVertices; + LineageGraphResult.Edge[] actualEdgeArray; + actualVertices = lineageGraphResult.getVertices(); + actualEdgeArray = lineageGraphResult.getEdges(); + + Set<LineageGraphResult.Edge> expectedEdgeSet = new HashSet<>(Arrays.asList(expectedEdgeArray)); + Set<LineageGraphResult.Edge> actualEdgeSet = new HashSet<>(Arrays.asList(actualEdgeArray)); + + Set<String> expectedVerticesSet = new HashSet<>(Arrays.asList(expectedVertices)); + Set<String> actualVerticesSet = new HashSet<>(Arrays.asList(actualVertices)); + + Assert.assertEquals(expectedEdgeSet, actualEdgeSet, "Edges dont match"); + Assert.assertEquals(expectedVerticesSet, actualVerticesSet, "Vertices dont match"); + } + +} + http://git-wip-us.apache.org/repos/asf/falcon/blob/f64e6c16/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/Util.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/Util.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/Util.java index c220215..6c8d4ee 100644 --- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/Util.java +++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/Util.java @@ -382,6 +382,7 @@ public final class Util { STATUS_URL("/api/entities/status"), ENTITY_SUMMARY("/api/entities/summary"), SUBMIT_AND_SCHEDULE_URL("/api/entities/submitAndSchedule"), + ENTITY_LINEAGE("/api/metadata/lineage/entities"), INSTANCE_RUNNING("/api/instance/running"), INSTANCE_STATUS("/api/instance/status"), INSTANCE_KILL("/api/instance/kill"), @@ -559,4 +560,24 @@ public final class Util { return className; } } + + /** + * Sends api requests. + * @param url target url + * @param method request method + * @param data data to be places in body of request + * @param user user to be used to send request + * @return api response + * @throws IOException + * @throws URISyntaxException + * @throws AuthenticationException + */ + public static ServiceResponse sendRequestLineage(String url, String method, String data, String user) + throws IOException, URISyntaxException, AuthenticationException, InterruptedException { + BaseRequest request = new BaseRequest(url, method, user, data); + request.addHeader(RequestKeys.CONTENT_TYPE_HEADER, RequestKeys.JSON_CONTENT_TYPE); + HttpResponse response = request.run(); + return new ServiceResponse(response); + } + } http://git-wip-us.apache.org/repos/asf/falcon/blob/f64e6c16/falcon-regression/merlin-core/src/main/java/org/apache/falcon/request/RequestKeys.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/request/RequestKeys.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/request/RequestKeys.java index b2e38b2..05d5fc6 100644 --- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/request/RequestKeys.java +++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/request/RequestKeys.java @@ -26,6 +26,7 @@ public final class RequestKeys { public static final String CONTENT_TYPE_HEADER = "Content-Type"; public static final String XML_CONTENT_TYPE = "text/xml"; + public static final String JSON_CONTENT_TYPE = "application/json"; public static final String AUTH_COOKIE = "hadoop.auth"; public static final String AUTH_COOKIE_EQ = AUTH_COOKIE + "="; http://git-wip-us.apache.org/repos/asf/falcon/blob/f64e6c16/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/EntityLineageTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/EntityLineageTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/EntityLineageTest.java new file mode 100644 index 0000000..52cb198 --- /dev/null +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/EntityLineageTest.java @@ -0,0 +1,221 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.regression.lineage; + +import org.apache.falcon.Pair; +import org.apache.falcon.entity.v0.Frequency; +import org.apache.falcon.entity.v0.Frequency.TimeUnit; +import org.apache.falcon.entity.v0.feed.LocationType; +import org.apache.falcon.regression.Entities.FeedMerlin; +import org.apache.falcon.regression.Entities.ProcessMerlin; +import org.apache.falcon.regression.core.bundle.Bundle; +import org.apache.falcon.regression.core.helpers.ColoHelper; +import org.apache.falcon.regression.core.response.ServiceResponse; +import org.apache.falcon.regression.core.util.AssertUtil; +import org.apache.falcon.regression.core.util.BundleUtil; +import org.apache.falcon.regression.core.util.Generator; +import org.apache.falcon.regression.core.util.OSUtil; +import org.apache.falcon.regression.core.util.EntityLineageUtil; +import org.apache.falcon.regression.testHelper.BaseTestClass; +import org.apache.falcon.resource.LineageGraphResult; +import org.apache.hadoop.security.authentication.client.AuthenticationException; +import org.apache.log4j.Logger; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import javax.xml.bind.JAXBException; +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * Test Suite for Entity lineage. + */ +@Test(groups = "embedded") +public class EntityLineageTest extends BaseTestClass { + + private String baseTestDir = cleanAndGetTestDir(); + private String aggregateWorkflowDir = baseTestDir + "/aggregator"; + private ColoHelper cluster = servers.get(0); + private static final Logger LOGGER = Logger.getLogger(EntityLineageTest.class); + + private int numInputFeeds; + private String startTime = "2015-06-06T09:37Z"; + private String endTime = "2015-06-06T09:45Z"; + + @BeforeMethod(alwaysRun = true) + public void setup() throws Exception { + LOGGER.info("Time range between : " + startTime + " and " + endTime); + uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE); + bundles[0] = BundleUtil.readELBundle(); + bundles[0] = new Bundle(bundles[0], cluster); + bundles[0].generateUniqueBundle(this); + bundles[0].submitClusters(prism); + + bundles[0].setProcessWorkflow(aggregateWorkflowDir); + bundles[0].setProcessValidity(startTime, endTime); + bundles[0].setProcessConcurrency(6); + bundles[0].setProcessPeriodicity(10, TimeUnit.minutes); + } + + @AfterMethod(alwaysRun = true) + public void tearDown() throws IOException { + removeTestClassEntities(); + cleanTestsDirs(); + } + + @Test(groups = {"singleCluster"}) + public void lineageTestFail() throws Exception { + String failedPipelineName = "pipeline2"; + ServiceResponse response = prism.getProcessHelper().getEntityLineage("pipeline=" + failedPipelineName); + AssertUtil.assertFailed(response, "No processes belonging to pipeline " + failedPipelineName); + } + + @Test(groups = {"singleCluster"}) + public void lineageTestPass() throws Exception { + + String pipelineName = "pipeline1"; + + FeedMerlin inputMerlin = new FeedMerlin(bundles[0].getInputFeedFromBundle()); + inputMerlin.setValidity("2014-01-01T01:00Z", "2016-12-12T22:00Z"); + inputMerlin.setFrequency(new Frequency("5", TimeUnit.minutes)); + numInputFeeds = 10; + + FeedMerlin[] feedMerlins = generateFeeds(numInputFeeds, inputMerlin, + Generator.getNameGenerator("Feed", ""), + Generator.getHadoopPathGenerator(baseTestDir, MINUTE_DATE_PATTERN)); + + Pair<String[], String[]>[] input = new Pair[]{ + new Pair((new String[]{"Feed001"}), new String[]{"Feed002", "Feed005"}), + new Pair(new String[]{"Feed002"}, new String[]{"Feed003"}), + new Pair(new String[]{"Feed002"}, new String[]{"Feed004"}), + new Pair(new String[]{"Feed003", "Feed004", "Feed005"}, new String[]{"Feed001", "Feed006"}), + new Pair(new String[]{"Feed006"}, new String[]{"Feed007"}), + new Pair(new String[]{"Feed008", "Feed010"}, new String[]{"Feed009"}), + new Pair(new String[]{"Feed009"}, new String[]{"Feed010"}), + }; + + List<Pair<String[], String[]>> processFeed = Arrays.asList(input); + List<ProcessMerlin> processMerlins = new ArrayList<>(); + Generator nameGenerator=Generator.getNameGenerator("Process", ""); + + for(Integer i=0; i<processFeed.size(); i++){ + ProcessMerlin process = createProcess(nameGenerator.generate().replace("-", ""), pipelineName, + processFeed.get(i).first, processFeed.get(i).second); + processMerlins.add(i, process); + } + + deleteProcess(processMerlins); + deleteFeed(Arrays.asList(feedMerlins)); + + for (FeedMerlin feed : feedMerlins) { + AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(feed.toString())); + } + + for(ProcessMerlin process : processMerlins) { + AssertUtil.assertSucceeded(prism.getProcessHelper().submitEntity(process.toString())); + } + + LineageGraphResult lineageGraphResult = prism.getProcessHelper().getEntityLineage("pipeline=" + pipelineName) + .getLineageGraphResult(); + + LOGGER.info("LineageGraphResult : " + lineageGraphResult.toString()); + + validateLineage(lineageGraphResult); + + deleteProcess(processMerlins); + deleteFeed(Arrays.asList(feedMerlins)); + } + + public ProcessMerlin createProcess(String processName, String pipelineName, String[] inputFeed, + String[] outputFeed) throws URISyntaxException, AuthenticationException, + InterruptedException, IOException, JAXBException { + ProcessMerlin processMerlin = new ProcessMerlin(bundles[0].getProcessObject().toString()); + processMerlin.setName(processName); + processMerlin.setPipelineTag(pipelineName); + setProcessData(processMerlin, inputFeed, outputFeed); + return processMerlin; + } + + private void setProcessData(ProcessMerlin processMerlin, String[] ipName, String[] opName) { + processMerlin.resetInputFeed(ipName[0], ipName[0]); + processMerlin.resetOutputFeed(opName[0], opName[0]); + + if (ipName.length > 1) { + String[] ipFeed = new String[ipName.length -1]; + System.arraycopy(ipName, 1, ipFeed, 0, ipName.length - 1); + processMerlin.addInputFeeds(ipFeed); + } + if (opName.length > 1) { + String[] opFeed = new String[opName.length - 1]; + System.arraycopy(opName, 1, opFeed, 0, opName.length - 1); + processMerlin.addOutputFeeds(opFeed); + } + } + + public static FeedMerlin[] generateFeeds(final int numInputFeeds, + final FeedMerlin originalFeedMerlin, + final Generator nameGenerator, + final Generator pathGenerator) { + FeedMerlin[] inputFeeds = new FeedMerlin[numInputFeeds]; + //submit all input feeds + for(int count = 0; count < numInputFeeds; ++count) { + final FeedMerlin feed = new FeedMerlin(originalFeedMerlin.toString()); + feed.setName(nameGenerator.generate().replace("-", "")); + feed.setLocation(LocationType.DATA, pathGenerator.generate()); + inputFeeds[count] = feed; + } + return inputFeeds; + } + + private void deleteProcess(List<ProcessMerlin> processMerlins) throws InterruptedException, IOException, + URISyntaxException, JAXBException, AuthenticationException { + for(ProcessMerlin process : processMerlins) { + AssertUtil.assertSucceeded(prism.getProcessHelper().delete(process.toString())); + } + } + + private void deleteFeed(List<FeedMerlin> feedMerlins) throws InterruptedException, IOException, + URISyntaxException, JAXBException, AuthenticationException { + for(FeedMerlin feed : feedMerlins) { + AssertUtil.assertSucceeded(prism.getFeedHelper().delete(feed.toString())); + } + } + + private void validateLineage(LineageGraphResult lineageGraphResult) { + String[] expectedVertices = {"Process001", "Process002", "Process003", "Process004", "Process005", + "Process006", "Process007", }; + LineageGraphResult.Edge[] expectedEdgeArray = new LineageGraphResult.Edge[lineageGraphResult.getEdges().length]; + expectedEdgeArray[0] = new LineageGraphResult.Edge("Process004", "Process005", "Feed006"); + expectedEdgeArray[1] = new LineageGraphResult.Edge("Process006", "Process007", "Feed009"); + expectedEdgeArray[2] = new LineageGraphResult.Edge("Process004", "Process001", "Feed001"); + expectedEdgeArray[3] = new LineageGraphResult.Edge("Process002", "Process004", "Feed003"); + expectedEdgeArray[4] = new LineageGraphResult.Edge("Process007", "Process006", "Feed010"); + expectedEdgeArray[5] = new LineageGraphResult.Edge("Process001", "Process002", "Feed002"); + expectedEdgeArray[6] = new LineageGraphResult.Edge("Process001", "Process003", "Feed002"); + expectedEdgeArray[7] = new LineageGraphResult.Edge("Process001", "Process004", "Feed005"); + expectedEdgeArray[8] = new LineageGraphResult.Edge("Process003", "Process004", "Feed004"); + + EntityLineageUtil.validateLineageGraphResult(lineageGraphResult, expectedVertices, expectedEdgeArray); + } +} +
