Repository: incubator-atlas Updated Branches: refs/heads/master 0defc6e80 -> 161079155
ATLAS-537 Falcon hook failing when tried to submit a process which creates a hive table. ( shwethgs via sumasai) Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/16107915 Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/16107915 Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/16107915 Branch: refs/heads/master Commit: 161079155c403b86f4318e683ae46356017c7624 Parents: 0defc6e Author: Suma Shivaprasad <[email protected]> Authored: Wed Mar 9 11:33:18 2016 -0800 Committer: Suma Shivaprasad <[email protected]> Committed: Wed Mar 9 11:33:18 2016 -0800 ---------------------------------------------------------------------- .../apache/atlas/falcon/hook/FalconHook.java | 12 ++- .../apache/atlas/falcon/hook/FalconHookIT.java | 81 ++++++++++++++++---- .../src/test/resources/feed-hdfs.xml | 39 ++++++++++ release-log.txt | 1 + 4 files changed, 113 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/16107915/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java ---------------------------------------------------------------------- diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java index 05765bb..47fa714 100644 --- a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java +++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java @@ -235,8 +235,10 @@ public class FalconHook extends FalconEventPublisher { if (process.getInputs() != null) { for (Input input : process.getInputs().getInputs()) { List<Referenceable> clusterInputs = getInputOutputEntity(cluster, input.getFeed()); - entities.addAll(clusterInputs); - inputs.add(clusterInputs.get(clusterInputs.size() -1 )); + if (clusterInputs != null) { + entities.addAll(clusterInputs); + inputs.add(clusterInputs.get(clusterInputs.size() - 1)); + } } } @@ -244,8 +246,10 @@ public class FalconHook extends FalconEventPublisher { if (process.getOutputs() != null) { for (Output output : process.getOutputs().getOutputs()) { List<Referenceable> clusterOutputs = getInputOutputEntity(cluster, output.getFeed()); - entities.addAll(clusterOutputs); - outputs.add(clusterOutputs.get(clusterOutputs.size() -1 )); + if (clusterOutputs != null) { + entities.addAll(clusterOutputs); + outputs.add(clusterOutputs.get(clusterOutputs.size() - 1)); + } } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/16107915/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java ---------------------------------------------------------------------- diff --git a/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java b/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java index 12b7a8b..3881bd6 100644 --- a/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java +++ b/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java @@ -43,12 +43,15 @@ import javax.xml.bind.JAXBException; import java.util.List; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; public class FalconHookIT { public static final Logger LOG = org.slf4j.LoggerFactory.getLogger(FalconHookIT.class); public static final String CLUSTER_RESOURCE = "/cluster.xml"; public static final String FEED_RESOURCE = "/feed.xml"; + public static final String FEED_HDFS_RESOURCE = "/feed-hdfs.xml"; public static final String PROCESS_RESOURCE = "/process.xml"; private AtlasClient dgiCLient; @@ -96,21 +99,13 @@ public class FalconHookIT { Cluster cluster = loadEntity(EntityType.CLUSTER, CLUSTER_RESOURCE, "cluster" + random()); STORE.publish(EntityType.CLUSTER, cluster); - Feed infeed = loadEntity(EntityType.FEED, FEED_RESOURCE, "feedin" + random()); - org.apache.falcon.entity.v0.feed.Cluster feedCluster = infeed.getClusters().getClusters().get(0); - feedCluster.setName(cluster.getName()); - String inTableName = "table" + random(); - String inDbName = "db" + random(); - feedCluster.getTable().setUri(getTableUri(inDbName, inTableName)); - STORE.publish(EntityType.FEED, infeed); + Feed infeed = getTableFeed(FEED_RESOURCE, cluster.getName()); + String inTableName = getTableName(infeed); + String inDbName = getDBName(infeed); - Feed outfeed = loadEntity(EntityType.FEED, FEED_RESOURCE, "feedout" + random()); - feedCluster = outfeed.getClusters().getClusters().get(0); - feedCluster.setName(cluster.getName()); - String outTableName = "table" + random(); - String outDbName = "db" + random(); - feedCluster.getTable().setUri(getTableUri(outDbName, outTableName)); - STORE.publish(EntityType.FEED, outfeed); + Feed outfeed = getTableFeed(FEED_RESOURCE, cluster.getName()); + String outTableName = getTableName(outfeed); + String outDbName = getDBName(outfeed); Process process = loadEntity(EntityType.PROCESS, PROCESS_RESOURCE, "process" + random()); process.getClusters().getClusters().get(0).setName(cluster.getName()); @@ -120,6 +115,7 @@ public class FalconHookIT { String pid = assertProcessIsRegistered(cluster.getName(), process.getName()); Referenceable processEntity = dgiCLient.getEntity(pid); + assertNotNull(processEntity); assertEquals(processEntity.get("processName"), process.getName()); Id inId = (Id) ((List)processEntity.get("inputs")).get(0); @@ -133,7 +129,60 @@ public class FalconHookIT { HiveMetaStoreBridge.getTableQualifiedName(cluster.getName(), outDbName, outTableName)); } -// @Test (enabled = true, dependsOnMethods = "testCreateProcess") + private Feed getTableFeed(String feedResource, String clusterName) throws Exception { + Feed feed = loadEntity(EntityType.FEED, feedResource, "feed" + random()); + org.apache.falcon.entity.v0.feed.Cluster feedCluster = feed.getClusters().getClusters().get(0); + feedCluster.setName(clusterName); + feedCluster.getTable().setUri(getTableUri("db" + random(), "table" + random())); + STORE.publish(EntityType.FEED, feed); + return feed; + } + + private String getDBName(Feed feed) { + String uri = feed.getClusters().getClusters().get(0).getTable().getUri(); + String[] parts = uri.split(":"); + return parts[1]; + } + + private String getTableName(Feed feed) { + String uri = feed.getClusters().getClusters().get(0).getTable().getUri(); + String[] parts = uri.split(":"); + parts = parts[2].split("#"); + return parts[0]; + } + + @Test (enabled = true) + public void testCreateProcessWithHDFSFeed() throws Exception { + Cluster cluster = loadEntity(EntityType.CLUSTER, CLUSTER_RESOURCE, "cluster" + random()); + STORE.publish(EntityType.CLUSTER, cluster); + + Feed infeed = loadEntity(EntityType.FEED, FEED_HDFS_RESOURCE, "feed" + random()); + org.apache.falcon.entity.v0.feed.Cluster feedCluster = infeed.getClusters().getClusters().get(0); + feedCluster.setName(cluster.getName()); + STORE.publish(EntityType.FEED, infeed); + + Feed outfeed = getTableFeed(FEED_RESOURCE, cluster.getName()); + String outTableName = getTableName(outfeed); + String outDbName = getDBName(outfeed); + + Process process = loadEntity(EntityType.PROCESS, PROCESS_RESOURCE, "process" + random()); + process.getClusters().getClusters().get(0).setName(cluster.getName()); + process.getInputs().getInputs().get(0).setFeed(infeed.getName()); + process.getOutputs().getOutputs().get(0).setFeed(outfeed.getName()); + STORE.publish(EntityType.PROCESS, process); + + String pid = assertProcessIsRegistered(cluster.getName(), process.getName()); + Referenceable processEntity = dgiCLient.getEntity(pid); + assertEquals(processEntity.get("processName"), process.getName()); + assertNull(processEntity.get("inputs")); + + Id outId = (Id) ((List)processEntity.get("outputs")).get(0); + Referenceable outEntity = dgiCLient.getEntity(outId._getId()); + assertEquals(outEntity.get("name"), + HiveMetaStoreBridge.getTableQualifiedName(cluster.getName(), outDbName, outTableName)); + } + + // @Test (enabled = true, dependsOnMethods = "testCreateProcess") // public void testUpdateProcess() throws Exception { // FalconEvent event = createProcessEntity(PROCESS_NAME_2, INPUT, OUTPUT); // FalconEventPublisher.Data data = new FalconEventPublisher.Data(event); @@ -156,7 +205,7 @@ public class FalconHookIT { } private String assertEntityIsRegistered(final String query) throws Exception { - waitFor(20000, new Predicate() { + waitFor(2000000, new Predicate() { @Override public boolean evaluate() throws Exception { JSONArray results = dgiCLient.search(query); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/16107915/addons/falcon-bridge/src/test/resources/feed-hdfs.xml ---------------------------------------------------------------------- diff --git a/addons/falcon-bridge/src/test/resources/feed-hdfs.xml b/addons/falcon-bridge/src/test/resources/feed-hdfs.xml new file mode 100644 index 0000000..435db07 --- /dev/null +++ b/addons/falcon-bridge/src/test/resources/feed-hdfs.xml @@ -0,0 +1,39 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> +<feed description="test input" name="testinput" xmlns="uri:falcon:feed:0.1"> + <groups>online,bi</groups> + + <frequency>hours(1)</frequency> + <timezone>UTC</timezone> + <late-arrival cut-off="hours(3)"/> + + <clusters> + <cluster name="testcluster" type="source"> + <validity start="2010-01-01T00:00Z" end="2012-04-21T00:00Z"/> + <retention limit="hours(24)" action="delete"/> + </cluster> + </clusters> + + <locations> + <location type="data" path="/tmp/input/${YEAR}-${MONTH}-${DAY}-${HOUR}"/> + </locations> + + <ACL owner="testuser" group="group" permission="0x755"/> + <schema location="hcat" provider="hcat"/> +</feed> http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/16107915/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 82ce633..9330850 100644 --- a/release-log.txt +++ b/release-log.txt @@ -11,6 +11,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags) ALL CHANGES: +ATLAS-537 Falcon hook failing when tried to submit a process which creates a hive table ( shwethags via sumasai) ATLAS-476 Update type attribute with Reserved characters updated the original type as unknown (yhemanth via shwethags) ATLAS-463 Disconnect inverse references ( dkantor via sumasai) ATLAS-479 Add description for different types during create time (guptaneeru via shwethags)
