Repository: incubator-atlas Updated Branches: refs/heads/master 857561a39 -> b65dd91c3
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/repository/src/test/java/org/apache/atlas/discovery/DataSetLineageServiceTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/discovery/DataSetLineageServiceTest.java b/repository/src/test/java/org/apache/atlas/discovery/DataSetLineageServiceTest.java new file mode 100644 index 0000000..3028dde --- /dev/null +++ b/repository/src/test/java/org/apache/atlas/discovery/DataSetLineageServiceTest.java @@ -0,0 +1,447 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.discovery; + +import com.google.common.collect.ImmutableList; +import org.apache.atlas.AtlasException; +import org.apache.atlas.BaseRepositoryTest; +import org.apache.atlas.RepositoryMetadataModule; +import org.apache.atlas.typesystem.ITypedReferenceableInstance; +import org.apache.atlas.typesystem.Referenceable; +import org.apache.atlas.typesystem.exception.EntityNotFoundException; +import org.apache.atlas.typesystem.persistence.Id; +import org.apache.commons.collections.ArrayStack; +import org.apache.commons.lang.RandomStringUtils; +import org.codehaus.jettison.json.JSONArray; +import org.codehaus.jettison.json.JSONObject; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Guice; +import org.testng.annotations.Test; + +import javax.inject.Inject; +import java.util.Arrays; +import java.util.List; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.fail; + +/** + * Unit tests for Hive LineageService. + */ +@Guice(modules = RepositoryMetadataModule.class) +public class DataSetLineageServiceTest extends BaseRepositoryTest { + + @Inject + private DiscoveryService discoveryService; + + @Inject + private DataSetLineageService lineageService; + + @BeforeClass + public void setUp() throws Exception { + super.setUp(); + } + + @AfterClass + public void tearDown() throws Exception { + super.tearDown(); + } + + @DataProvider(name = "dslQueriesProvider") + private Object[][] createDSLQueries() { + return new String[][]{ + // joins + {"hive_table where name=\"sales_fact\", columns"}, + {"hive_table where name=\"sales_fact\", columns select name, dataType, comment"}, + {"hive_table where name=\"sales_fact\", columns as c select c.name, c.dataType, c.comment"}, + // {"hive_db as db where (db.name=\"Reporting\"), hive_table as table select db.name, + // table.name"}, + {"from hive_db"}, {"hive_db"}, {"hive_db where hive_db.name=\"Reporting\""}, + {"hive_db hive_db.name = \"Reporting\""}, + {"hive_db where hive_db.name=\"Reporting\" select name, owner"}, {"hive_db has name"}, + // {"hive_db, hive_table"}, + // {"hive_db, hive_process has name"}, + // {"hive_db as db1, hive_table where db1.name = \"Reporting\""}, + // {"hive_db where hive_db.name=\"Reporting\" and hive_db.createTime < " + System + // .currentTimeMillis()}, + {"from hive_table"}, {"hive_table"}, {"hive_table is Dimension"}, + {"hive_column where hive_column isa PII"}, + // {"hive_column where hive_column isa PII select hive_column.name"}, + {"hive_column select hive_column.name"}, {"hive_column select name"}, + {"hive_column where hive_column.name=\"customer_id\""}, {"from hive_table select hive_table.name"}, + {"hive_db where (name = \"Reporting\")"}, + {"hive_db where (name = \"Reporting\") select name as _col_0, owner as _col_1"}, + {"hive_db where hive_db has name"}, + // {"hive_db hive_table"}, + {"hive_db where hive_db has name"}, + // {"hive_db as db1 hive_table where (db1.name = \"Reporting\")"}, + {"hive_db where (name = \"Reporting\") select name as _col_0, (createTime + 1) as _col_1 "}, + // {"hive_db where (name = \"Reporting\") and ((createTime + 1) > 0)"}, + // {"hive_db as db1 hive_table as tab where ((db1.createTime + 1) > 0) and (db1.name = + // \"Reporting\") select db1.name as dbName, tab.name as tabName"}, + // {"hive_db as db1 hive_table as tab where ((db1.createTime + 1) > 0) or (db1.name = + // \"Reporting\") select db1.name as dbName, tab.name as tabName"}, + // {"hive_db as db1 hive_table as tab where ((db1.createTime + 1) > 0) and (db1.name = + // \"Reporting\") or db1 has owner select db1.name as dbName, tab.name as tabName"}, + // {"hive_db as db1 hive_table as tab where ((db1.createTime + 1) > 0) and (db1.name = + // \"Reporting\") or db1 has owner select db1.name as dbName, tab.name as tabName"}, + // trait searches + {"Dimension"}, {"Fact"}, {"ETL"}, {"Metric"}, {"PII"},}; + } + + @Test(dataProvider = "dslQueriesProvider") + public void testSearchByDSLQueries(String dslQuery) throws Exception { + System.out.println("Executing dslQuery = " + dslQuery); + String jsonResults = discoveryService.searchByDSL(dslQuery); + assertNotNull(jsonResults); + + JSONObject results = new JSONObject(jsonResults); + Assert.assertEquals(results.length(), 3); + System.out.println("results = " + results); + + Object query = results.get("query"); + assertNotNull(query); + + JSONObject dataType = results.getJSONObject("dataType"); + assertNotNull(dataType); + String typeName = dataType.getString("typeName"); + assertNotNull(typeName); + + JSONArray rows = results.getJSONArray("rows"); + assertNotNull(rows); + Assert.assertTrue(rows.length() >= 0); // some queries may not have any results + System.out.println("query [" + dslQuery + "] returned [" + rows.length() + "] rows"); + } + + @Test(dataProvider = "invalidArgumentsProvider") + public void testGetInputsGraphInvalidArguments(final String tableName, String expectedException) throws Exception { + testInvalidArguments(expectedException, new Invoker() { + @Override + void run() throws AtlasException { + lineageService.getInputsGraph(tableName); + } + }); + } + + @Test(dataProvider = "invalidArgumentsProvider") + public void testGetInputsGraphForEntityInvalidArguments(final String tableName, String expectedException) + throws Exception { + testInvalidArguments(expectedException, new Invoker() { + @Override + void run() throws AtlasException { + lineageService.getInputsGraphForEntity(tableName); + } + }); + } + + @Test + public void testGetInputsGraph() throws Exception { + JSONObject results = new JSONObject(lineageService.getInputsGraph("sales_fact_monthly_mv")); + assertNotNull(results); + System.out.println("inputs graph = " + results); + + JSONObject values = results.getJSONObject("values"); + assertNotNull(values); + + final JSONObject vertices = values.getJSONObject("vertices"); + Assert.assertEquals(vertices.length(), 4); + + final JSONObject edges = values.getJSONObject("edges"); + Assert.assertEquals(edges.length(), 4); + } + + @Test + public void testGetInputsGraphForEntity() throws Exception { + ITypedReferenceableInstance entity = + repository.getEntityDefinition(HIVE_TABLE_TYPE, "name", "sales_fact_monthly_mv"); + + JSONObject results = new JSONObject(lineageService.getInputsGraphForEntity(entity.getId()._getId())); + assertNotNull(results); + System.out.println("inputs graph = " + results); + + JSONObject values = results.getJSONObject("values"); + assertNotNull(values); + + final JSONObject vertices = values.getJSONObject("vertices"); + Assert.assertEquals(vertices.length(), 4); + + final JSONObject edges = values.getJSONObject("edges"); + Assert.assertEquals(edges.length(), 4); + } + + @Test(dataProvider = "invalidArgumentsProvider") + public void testGetOutputsGraphInvalidArguments(final String tableName, String expectedException) throws Exception { + testInvalidArguments(expectedException, new Invoker() { + @Override + void run() throws AtlasException { + lineageService.getOutputsGraph(tableName); + } + }); + } + + @Test(dataProvider = "invalidArgumentsProvider") + public void testGetOutputsGraphForEntityInvalidArguments(final String tableName, String expectedException) + throws Exception { + testInvalidArguments(expectedException, new Invoker() { + @Override + void run() throws AtlasException { + lineageService.getOutputsGraphForEntity(tableName); + } + }); + } + + @Test + public void testGetOutputsGraph() throws Exception { + JSONObject results = new JSONObject(lineageService.getOutputsGraph("sales_fact")); + assertNotNull(results); + System.out.println("outputs graph = " + results); + + JSONObject values = results.getJSONObject("values"); + assertNotNull(values); + + final JSONObject vertices = values.getJSONObject("vertices"); + Assert.assertEquals(vertices.length(), 3); + + final JSONObject edges = values.getJSONObject("edges"); + Assert.assertEquals(edges.length(), 4); + } + + @Test + public void testGetOutputsGraphForEntity() throws Exception { + ITypedReferenceableInstance entity = + repository.getEntityDefinition(HIVE_TABLE_TYPE, "name", "sales_fact"); + + JSONObject results = new JSONObject(lineageService.getOutputsGraphForEntity(entity.getId()._getId())); + assertNotNull(results); + System.out.println("outputs graph = " + results); + + JSONObject values = results.getJSONObject("values"); + assertNotNull(values); + + final JSONObject vertices = values.getJSONObject("vertices"); + Assert.assertEquals(vertices.length(), 3); + + final JSONObject edges = values.getJSONObject("edges"); + Assert.assertEquals(edges.length(), 4); + } + + @DataProvider(name = "tableNamesProvider") + private Object[][] tableNames() { + return new String[][]{{"sales_fact", "4"}, {"time_dim", "3"}, {"sales_fact_daily_mv", "4"}, + {"sales_fact_monthly_mv", "4"}}; + } + + @Test(dataProvider = "tableNamesProvider") + public void testGetSchema(String tableName, String expected) throws Exception { + JSONObject results = new JSONObject(lineageService.getSchema(tableName)); + assertNotNull(results); + System.out.println("columns = " + results); + + JSONArray rows = results.getJSONArray("rows"); + Assert.assertEquals(rows.length(), Integer.parseInt(expected)); + + for (int index = 0; index < rows.length(); index++) { + final JSONObject row = rows.getJSONObject(index); + assertNotNull(row.getString("name")); + assertNotNull(row.getString("comment")); + assertNotNull(row.getString("dataType")); + Assert.assertEquals(row.getString("$typeName$"), "hive_column"); + } + } + + @Test(dataProvider = "tableNamesProvider") + public void testGetSchemaForEntity(String tableName, String expected) throws Exception { + ITypedReferenceableInstance entity = + repository.getEntityDefinition(HIVE_TABLE_TYPE, "name", tableName); + + JSONObject results = new JSONObject(lineageService.getSchemaForEntity(entity.getId()._getId())); + assertNotNull(results); + System.out.println("columns = " + results); + + JSONArray rows = results.getJSONArray("rows"); + Assert.assertEquals(rows.length(), Integer.parseInt(expected)); + + for (int index = 0; index < rows.length(); index++) { + final JSONObject row = rows.getJSONObject(index); + assertNotNull(row.getString("name")); + assertNotNull(row.getString("comment")); + assertNotNull(row.getString("dataType")); + Assert.assertEquals(row.getString("$typeName$"), "hive_column"); + } + } + + @DataProvider(name = "invalidArgumentsProvider") + private Object[][] arguments() { + return new String[][]{{null, IllegalArgumentException.class.getName()}, + {"", IllegalArgumentException.class.getName()}, + {"blah", EntityNotFoundException.class.getName()}}; + } + + abstract class Invoker { + abstract void run() throws AtlasException; + } + + public void testInvalidArguments(String expectedException, Invoker invoker) throws Exception { + try { + invoker.run(); + fail("Expected " + expectedException); + } catch(Exception e) { + assertEquals(e.getClass().getName(), expectedException); + } + } + + @Test(dataProvider = "invalidArgumentsProvider") + public void testGetSchemaInvalidArguments(final String tableName, String expectedException) throws Exception { + testInvalidArguments(expectedException, new Invoker() { + @Override + void run() throws AtlasException { + lineageService.getSchema(tableName); + } + }); + } + + @Test(dataProvider = "invalidArgumentsProvider") + public void testGetSchemaForEntityInvalidArguments(final String entityId, String expectedException) throws Exception { + testInvalidArguments(expectedException, new Invoker() { + @Override + void run() throws AtlasException { + lineageService.getSchemaForEntity(entityId); + } + }); + } + + @Test + public void testLineageWithDelete() throws Exception { + String tableName = "table" + random(); + createTable(tableName, 3, true); + + JSONObject results = new JSONObject(lineageService.getSchema(tableName)); + assertEquals(results.getJSONArray("rows").length(), 3); + + results = new JSONObject(lineageService.getInputsGraph(tableName)); + assertEquals(results.getJSONObject("values").getJSONObject("vertices").length(), 2); + + results = new JSONObject(lineageService.getOutputsGraph(tableName)); + assertEquals(results.getJSONObject("values").getJSONObject("vertices").length(), 2); + + String tableId = getEntityId(HIVE_TABLE_TYPE, "name", tableName); + + results = new JSONObject(lineageService.getSchemaForEntity(tableId)); + assertEquals(results.getJSONArray("rows").length(), 3); + + results = new JSONObject(lineageService.getInputsGraphForEntity(tableId)); + assertEquals(results.getJSONObject("values").getJSONObject("vertices").length(), 2); + + results = new JSONObject(lineageService.getOutputsGraphForEntity(tableId)); + assertEquals(results.getJSONObject("values").getJSONObject("vertices").length(), 2); + + //Delete the entity. Lineage for entity returns the same results as before. + //Lineage for table name throws EntityNotFoundException + repository.deleteEntities(Arrays.asList(tableId)); + + results = new JSONObject(lineageService.getSchemaForEntity(tableId)); + assertEquals(results.getJSONArray("rows").length(), 3); + + results = new JSONObject(lineageService.getInputsGraphForEntity(tableId)); + assertEquals(results.getJSONObject("values").getJSONObject("vertices").length(), 2); + + results = new JSONObject(lineageService.getOutputsGraphForEntity(tableId)); + assertEquals(results.getJSONObject("values").getJSONObject("vertices").length(), 2); + + try { + lineageService.getSchema(tableName); + fail("Expected EntityNotFoundException"); + } catch (EntityNotFoundException e) { + //expected + } + + try { + lineageService.getInputsGraph(tableName); + fail("Expected EntityNotFoundException"); + } catch (EntityNotFoundException e) { + //expected + } + + try { + lineageService.getOutputsGraph(tableName); + fail("Expected EntityNotFoundException"); + } catch (EntityNotFoundException e) { + //expected + } + + //Create table again should show new lineage + createTable(tableName, 2, false); + results = new JSONObject(lineageService.getSchema(tableName)); + assertEquals(results.getJSONArray("rows").length(), 2); + + results = new JSONObject(lineageService.getOutputsGraph(tableName)); + assertEquals(results.getJSONObject("values").getJSONObject("vertices").length(), 0); + + results = new JSONObject(lineageService.getInputsGraph(tableName)); + assertEquals(results.getJSONObject("values").getJSONObject("vertices").length(), 0); + + tableId = getEntityId(HIVE_TABLE_TYPE, "name", tableName); + + results = new JSONObject(lineageService.getSchemaForEntity(tableId)); + assertEquals(results.getJSONArray("rows").length(), 2); + + results = new JSONObject(lineageService.getInputsGraphForEntity(tableId)); + assertEquals(results.getJSONObject("values").getJSONObject("vertices").length(), 0); + + results = new JSONObject(lineageService.getOutputsGraphForEntity(tableId)); + assertEquals(results.getJSONObject("values").getJSONObject("vertices").length(), 0); + } + + private void createTable(String tableName, int numCols, boolean createLineage) throws Exception { + String dbId = getEntityId(DATABASE_TYPE, "name", "Sales"); + Id salesDB = new Id(dbId, 0, DATABASE_TYPE); + + //Create the entity again and schema should return the new schema + List<Referenceable> columns = new ArrayStack(); + for (int i = 0; i < numCols; i++) { + columns.add(column("col" + random(), "int", "column descr")); + } + + Referenceable sd = + storageDescriptor("hdfs://host:8000/apps/warehouse/sales", "TextInputFormat", "TextOutputFormat", true, + ImmutableList.of(column("time_id", "int", "time id"))); + + Id table = table(tableName, "test table", salesDB, sd, "fetl", "External", columns); + if (createLineage) { + Id inTable = table("table" + random(), "test table", salesDB, sd, "fetl", "External", columns); + Id outTable = table("table" + random(), "test table", salesDB, sd, "fetl", "External", columns); + loadProcess("process" + random(), "hive query for monthly summary", "Tim ETL", ImmutableList.of(inTable), + ImmutableList.of(table), "create table as select ", "plan", "id", "graph", "ETL"); + loadProcess("process" + random(), "hive query for monthly summary", "Tim ETL", ImmutableList.of(table), + ImmutableList.of(outTable), "create table as select ", "plan", "id", "graph", "ETL"); + } + } + + private String random() { + return RandomStringUtils.randomAlphanumeric(5); + } + + private String getEntityId(String typeName, String attributeName, String attributeValue) throws Exception { + return repository.getEntityDefinition(typeName, attributeName, attributeValue).getId()._getId(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/repository/src/test/java/org/apache/atlas/discovery/GraphBackedDiscoveryServiceTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/discovery/GraphBackedDiscoveryServiceTest.java b/repository/src/test/java/org/apache/atlas/discovery/GraphBackedDiscoveryServiceTest.java index 3b50dfb..5e7de88 100755 --- a/repository/src/test/java/org/apache/atlas/discovery/GraphBackedDiscoveryServiceTest.java +++ b/repository/src/test/java/org/apache/atlas/discovery/GraphBackedDiscoveryServiceTest.java @@ -20,7 +20,7 @@ package org.apache.atlas.discovery; import com.google.common.collect.ImmutableSet; -import org.apache.atlas.BaseHiveRepositoryTest; +import org.apache.atlas.BaseRepositoryTest; import org.apache.atlas.RepositoryMetadataModule; import org.apache.atlas.RequestContext; import org.apache.atlas.TestUtils; @@ -60,7 +60,7 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; @Guice(modules = RepositoryMetadataModule.class) -public class GraphBackedDiscoveryServiceTest extends BaseHiveRepositoryTest { +public class GraphBackedDiscoveryServiceTest extends BaseRepositoryTest { @Inject private MetadataRepository repositoryService; http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/repository/src/test/java/org/apache/atlas/discovery/HiveLineageServiceTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/discovery/HiveLineageServiceTest.java b/repository/src/test/java/org/apache/atlas/discovery/HiveLineageServiceTest.java deleted file mode 100644 index 6d5a15a..0000000 --- a/repository/src/test/java/org/apache/atlas/discovery/HiveLineageServiceTest.java +++ /dev/null @@ -1,260 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.atlas.discovery; - -import org.apache.atlas.BaseHiveRepositoryTest; -import org.apache.atlas.RepositoryMetadataModule; -import org.apache.atlas.typesystem.exception.EntityNotFoundException; -import org.codehaus.jettison.json.JSONArray; -import org.codehaus.jettison.json.JSONObject; -import org.testng.Assert; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.DataProvider; -import org.testng.annotations.Guice; -import org.testng.annotations.Test; - -import javax.inject.Inject; - -/** - * Unit tests for Hive LineageService. - */ -@Guice(modules = RepositoryMetadataModule.class) -public class HiveLineageServiceTest extends BaseHiveRepositoryTest { - - @Inject - private DiscoveryService discoveryService; - - @Inject - private HiveLineageService hiveLineageService; - - @BeforeClass - public void setUp() throws Exception { - super.setUp(); - } - - @AfterClass - public void tearDown() throws Exception { - super.tearDown(); - } - - @DataProvider(name = "dslQueriesProvider") - private Object[][] createDSLQueries() { - return new String[][]{ - // joins - {"hive_table where name=\"sales_fact\", columns"}, - {"hive_table where name=\"sales_fact\", columns select name, dataType, comment"}, - {"hive_table where name=\"sales_fact\", columns as c select c.name, c.dataType, c.comment"}, - // {"hive_db as db where (db.name=\"Reporting\"), hive_table as table select db.name, - // table.name"}, - {"from hive_db"}, {"hive_db"}, {"hive_db where hive_db.name=\"Reporting\""}, - {"hive_db hive_db.name = \"Reporting\""}, - {"hive_db where hive_db.name=\"Reporting\" select name, owner"}, {"hive_db has name"}, - // {"hive_db, hive_table"}, - // {"hive_db, hive_process has name"}, - // {"hive_db as db1, hive_table where db1.name = \"Reporting\""}, - // {"hive_db where hive_db.name=\"Reporting\" and hive_db.createTime < " + System - // .currentTimeMillis()}, - {"from hive_table"}, {"hive_table"}, {"hive_table is Dimension"}, - {"hive_column where hive_column isa PII"}, - // {"hive_column where hive_column isa PII select hive_column.name"}, - {"hive_column select hive_column.name"}, {"hive_column select name"}, - {"hive_column where hive_column.name=\"customer_id\""}, {"from hive_table select hive_table.name"}, - {"hive_db where (name = \"Reporting\")"}, - {"hive_db where (name = \"Reporting\") select name as _col_0, owner as _col_1"}, - {"hive_db where hive_db has name"}, - // {"hive_db hive_table"}, - {"hive_db where hive_db has name"}, - // {"hive_db as db1 hive_table where (db1.name = \"Reporting\")"}, - {"hive_db where (name = \"Reporting\") select name as _col_0, (createTime + 1) as _col_1 "}, - // {"hive_db where (name = \"Reporting\") and ((createTime + 1) > 0)"}, - // {"hive_db as db1 hive_table as tab where ((db1.createTime + 1) > 0) and (db1.name = - // \"Reporting\") select db1.name as dbName, tab.name as tabName"}, - // {"hive_db as db1 hive_table as tab where ((db1.createTime + 1) > 0) or (db1.name = - // \"Reporting\") select db1.name as dbName, tab.name as tabName"}, - // {"hive_db as db1 hive_table as tab where ((db1.createTime + 1) > 0) and (db1.name = - // \"Reporting\") or db1 has owner select db1.name as dbName, tab.name as tabName"}, - // {"hive_db as db1 hive_table as tab where ((db1.createTime + 1) > 0) and (db1.name = - // \"Reporting\") or db1 has owner select db1.name as dbName, tab.name as tabName"}, - // trait searches - {"Dimension"}, {"Fact"}, {"ETL"}, {"Metric"}, {"PII"},}; - } - - @Test(dataProvider = "dslQueriesProvider") - public void testSearchByDSLQueries(String dslQuery) throws Exception { - System.out.println("Executing dslQuery = " + dslQuery); - String jsonResults = discoveryService.searchByDSL(dslQuery); - Assert.assertNotNull(jsonResults); - - JSONObject results = new JSONObject(jsonResults); - Assert.assertEquals(results.length(), 3); - System.out.println("results = " + results); - - Object query = results.get("query"); - Assert.assertNotNull(query); - - JSONObject dataType = results.getJSONObject("dataType"); - Assert.assertNotNull(dataType); - String typeName = dataType.getString("typeName"); - Assert.assertNotNull(typeName); - - JSONArray rows = results.getJSONArray("rows"); - Assert.assertNotNull(rows); - Assert.assertTrue(rows.length() >= 0); // some queries may not have any results - System.out.println("query [" + dslQuery + "] returned [" + rows.length() + "] rows"); - } - - @Test - public void testGetInputs() throws Exception { - JSONObject results = new JSONObject(hiveLineageService.getInputs("sales_fact_monthly_mv")); - Assert.assertNotNull(results); - System.out.println("inputs = " + results); - - JSONArray rows = results.getJSONArray("rows"); - Assert.assertTrue(rows.length() > 0); - - final JSONObject row = rows.getJSONObject(0); - JSONArray paths = row.getJSONArray("path"); - Assert.assertTrue(paths.length() > 0); - } - - @Test(expectedExceptions = IllegalArgumentException.class) - public void testGetInputsTableNameNull() throws Exception { - hiveLineageService.getInputs(null); - Assert.fail(); - } - - @Test(expectedExceptions = IllegalArgumentException.class) - public void testGetInputsTableNameEmpty() throws Exception { - hiveLineageService.getInputs(""); - Assert.fail(); - } - - @Test(expectedExceptions = EntityNotFoundException.class) - public void testGetInputsBadTableName() throws Exception { - hiveLineageService.getInputs("blah"); - Assert.fail(); - } - - @Test - public void testGetInputsGraph() throws Exception { - JSONObject results = new JSONObject(hiveLineageService.getInputsGraph("sales_fact_monthly_mv")); - Assert.assertNotNull(results); - System.out.println("inputs graph = " + results); - - JSONObject values = results.getJSONObject("values"); - Assert.assertNotNull(values); - - final JSONObject vertices = values.getJSONObject("vertices"); - Assert.assertEquals(vertices.length(), 4); - - final JSONObject edges = values.getJSONObject("edges"); - Assert.assertEquals(edges.length(), 4); - } - - @Test - public void testGetOutputs() throws Exception { - JSONObject results = new JSONObject(hiveLineageService.getOutputs("sales_fact")); - Assert.assertNotNull(results); - System.out.println("outputs = " + results); - - JSONArray rows = results.getJSONArray("rows"); - Assert.assertTrue(rows.length() > 0); - - final JSONObject row = rows.getJSONObject(0); - JSONArray paths = row.getJSONArray("path"); - Assert.assertTrue(paths.length() > 0); - } - - @Test(expectedExceptions = IllegalArgumentException.class) - public void testGetOututsTableNameNull() throws Exception { - hiveLineageService.getOutputs(null); - Assert.fail(); - } - - @Test(expectedExceptions = IllegalArgumentException.class) - public void testGetOutputsTableNameEmpty() throws Exception { - hiveLineageService.getOutputs(""); - Assert.fail(); - } - - @Test(expectedExceptions = EntityNotFoundException.class) - public void testGetOutputsBadTableName() throws Exception { - hiveLineageService.getOutputs("blah"); - Assert.fail(); - } - - @Test - public void testGetOutputsGraph() throws Exception { - JSONObject results = new JSONObject(hiveLineageService.getOutputsGraph("sales_fact")); - Assert.assertNotNull(results); - System.out.println("outputs graph = " + results); - - JSONObject values = results.getJSONObject("values"); - Assert.assertNotNull(values); - - final JSONObject vertices = values.getJSONObject("vertices"); - Assert.assertEquals(vertices.length(), 3); - - final JSONObject edges = values.getJSONObject("edges"); - Assert.assertEquals(edges.length(), 4); - } - - @DataProvider(name = "tableNamesProvider") - private Object[][] tableNames() { - return new String[][]{{"sales_fact", "4"}, {"time_dim", "3"}, {"sales_fact_daily_mv", "4"}, - {"sales_fact_monthly_mv", "4"}}; - } - - @Test(dataProvider = "tableNamesProvider") - public void testGetSchema(String tableName, String expected) throws Exception { - JSONObject results = new JSONObject(hiveLineageService.getSchema(tableName)); - Assert.assertNotNull(results); - System.out.println("columns = " + results); - - JSONArray rows = results.getJSONArray("rows"); - Assert.assertEquals(rows.length(), Integer.parseInt(expected)); - - for (int index = 0; index < rows.length(); index++) { - final JSONObject row = rows.getJSONObject(index); - Assert.assertNotNull(row.getString("name")); - Assert.assertNotNull(row.getString("comment")); - Assert.assertNotNull(row.getString("dataType")); - Assert.assertEquals(row.getString("$typeName$"), "hive_column"); - } - } - - @Test(expectedExceptions = IllegalArgumentException.class) - public void testGetSchemaTableNameNull() throws Exception { - hiveLineageService.getSchema(null); - Assert.fail(); - } - - @Test(expectedExceptions = IllegalArgumentException.class) - public void testGetSchemaTableNameEmpty() throws Exception { - hiveLineageService.getSchema(""); - Assert.fail(); - } - - @Test(expectedExceptions = EntityNotFoundException.class) - public void testGetSchemaBadTableName() throws Exception { - hiveLineageService.getSchema("blah"); - Assert.fail(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/repository/src/test/scala/org/apache/atlas/query/GremlinTest2.scala ---------------------------------------------------------------------- diff --git a/repository/src/test/scala/org/apache/atlas/query/GremlinTest2.scala b/repository/src/test/scala/org/apache/atlas/query/GremlinTest2.scala index 2fd8bb9..f65cedb 100755 --- a/repository/src/test/scala/org/apache/atlas/query/GremlinTest2.scala +++ b/repository/src/test/scala/org/apache/atlas/query/GremlinTest2.scala @@ -107,7 +107,7 @@ class GremlinTest2 extends BaseGremlinTest { } @Test def testHighLevelLineage { - val r = HiveLineageQuery("Table", "sales_fact_monthly_mv", + val r = InputLineageClosureQuery("Table", "name", "sales_fact_monthly_mv", "LoadProcess", "inputTables", "outputTable", @@ -116,7 +116,7 @@ class GremlinTest2 extends BaseGremlinTest { } @Test def testHighLevelLineageReturnGraph { - val r = HiveLineageQuery("Table", "sales_fact_monthly_mv", + val r = InputLineageClosureQuery("Table", "name", "sales_fact_monthly_mv", "LoadProcess", "inputTables", "outputTable", @@ -127,7 +127,7 @@ class GremlinTest2 extends BaseGremlinTest { } @Test def testHighLevelWhereUsed { - val r = HiveWhereUsedQuery("Table", "sales_fact", + val r = OutputLineageClosureQuery("Table", "name", "sales_fact", "LoadProcess", "inputTables", "outputTable", @@ -136,7 +136,7 @@ class GremlinTest2 extends BaseGremlinTest { } @Test def testHighLevelWhereUsedReturnGraph { - val r = HiveWhereUsedQuery("Table", "sales_fact", + val r = OutputLineageClosureQuery("Table", "name", "sales_fact", "LoadProcess", "inputTables", "outputTable", http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/server-api/src/main/java/org/apache/atlas/discovery/LineageService.java ---------------------------------------------------------------------- diff --git a/server-api/src/main/java/org/apache/atlas/discovery/LineageService.java b/server-api/src/main/java/org/apache/atlas/discovery/LineageService.java index 8dc36cd..5aab355 100644 --- a/server-api/src/main/java/org/apache/atlas/discovery/LineageService.java +++ b/server-api/src/main/java/org/apache/atlas/discovery/LineageService.java @@ -26,42 +26,50 @@ import org.apache.atlas.AtlasException; public interface LineageService { /** - * Return the lineage outputs for the given tableName. + * Return the lineage outputs graph for the given datasetName. * - * @param tableName tableName - * @return Outputs as JSON + * @param datasetName datasetName + * @return Outputs Graph as JSON */ - String getOutputs(String tableName) throws AtlasException; + String getOutputsGraph(String datasetName) throws AtlasException; /** - * Return the lineage outputs graph for the given tableName. + * Return the lineage inputs graph for the given datasetName. * - * @param tableName tableName - * @return Outputs Graph as JSON + * @param datasetName datasetName + * @return Inputs Graph as JSON */ - String getOutputsGraph(String tableName) throws AtlasException; + String getInputsGraph(String datasetName) throws AtlasException; /** - * Return the lineage inputs for the given tableName. + * Return the lineage inputs graph for the given entity id. * - * @param tableName tableName - * @return Inputs as JSON + * @param guid entity id + * @return Inputs Graph as JSON */ - String getInputs(String tableName) throws AtlasException; + String getInputsGraphForEntity(String guid) throws AtlasException; /** - * Return the lineage inputs graph for the given tableName. + * Return the lineage inputs graph for the given entity id. * - * @param tableName tableName + * @param guid entity id * @return Inputs Graph as JSON */ - String getInputsGraph(String tableName) throws AtlasException; + String getOutputsGraphForEntity(String guid) throws AtlasException; + + /** + * Return the schema for the given datasetName. + * + * @param datasetName datasetName + * @return Schema as JSON + */ + String getSchema(String datasetName) throws AtlasException; /** - * Return the schema for the given tableName. + * Return the schema for the given entity id. * - * @param tableName tableName + * @param guid tableName * @return Schema as JSON */ - String getSchema(String tableName) throws AtlasException; + String getSchemaForEntity(String guid) throws AtlasException; } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/typesystem/src/main/resources/atlas-application.properties ---------------------------------------------------------------------- diff --git a/typesystem/src/main/resources/atlas-application.properties b/typesystem/src/main/resources/atlas-application.properties index aafad0f..a8e77bb 100644 --- a/typesystem/src/main/resources/atlas-application.properties +++ b/typesystem/src/main/resources/atlas-application.properties @@ -49,14 +49,8 @@ atlas.graph.index.search.solr.zookeeper-url=${solr.zk.address} ######### Hive Lineage Configs ######### -# This models reflects the base super types for Data and Process -#atlas.lineage.hive.table.type.name=DataSet -#atlas.lineage.hive.process.type.name=Process -#atlas.lineage.hive.process.inputs.name=inputs -#atlas.lineage.hive.process.outputs.name=outputs - ## Schema -atlas.lineage.hive.table.schema.query.hive_table=hive_table where name='%s'\, columns +atlas.lineage.schema.query.hive_table=hive_table where __guid='%s'\, columns ######### Notification Configs ######### atlas.notification.embedded=true http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/webapp/src/main/java/org/apache/atlas/web/resources/DataSetLineageResource.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/DataSetLineageResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/DataSetLineageResource.java new file mode 100644 index 0000000..bb7fe46 --- /dev/null +++ b/webapp/src/main/java/org/apache/atlas/web/resources/DataSetLineageResource.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.web.resources; + +import org.apache.atlas.AtlasClient; +import org.apache.atlas.discovery.DiscoveryException; +import org.apache.atlas.discovery.LineageService; +import org.apache.atlas.typesystem.exception.EntityNotFoundException; +import org.apache.atlas.web.util.Servlets; +import org.codehaus.jettison.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.inject.Inject; +import javax.inject.Singleton; +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.Response; + +/** + * Jersey Resource for Hive Table Lineage. + */ +@Path("lineage/hive") +@Singleton +public class DataSetLineageResource { + + private static final Logger LOG = LoggerFactory.getLogger(DataSetLineageResource.class); + + private final LineageService lineageService; + + /** + * Created by the Guice ServletModule and injected with the + * configured LineageService. + * + * @param lineageService lineage service handle + */ + @Inject + public DataSetLineageResource(LineageService lineageService) { + this.lineageService = lineageService; + } + + /** + * Returns the inputs graph for a given entity. + * + * @param tableName table name + */ + @GET + @Path("table/{tableName}/inputs/graph") + @Consumes(Servlets.JSON_MEDIA_TYPE) + @Produces(Servlets.JSON_MEDIA_TYPE) + public Response inputsGraph(@Context HttpServletRequest request, @PathParam("tableName") String tableName) { + LOG.info("Fetching lineage inputs graph for tableName={}", tableName); + + try { + final String jsonResult = lineageService.getInputsGraph(tableName); + + JSONObject response = new JSONObject(); + response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId()); + response.put("tableName", tableName); + response.put(AtlasClient.RESULTS, new JSONObject(jsonResult)); + + return Response.ok(response).build(); + } catch (EntityNotFoundException e) { + LOG.error("table entity not found for {}", tableName, e); + throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.NOT_FOUND)); + } catch (DiscoveryException | IllegalArgumentException e) { + LOG.error("Unable to get lineage inputs graph for table {}", tableName, e); + throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST)); + } catch (Throwable e) { + LOG.error("Unable to get lineage inputs graph for table {}", tableName, e); + throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.INTERNAL_SERVER_ERROR)); + } + } + + /** + * Returns the outputs graph for a given entity. + * + * @param tableName table name + */ + @GET + @Path("table/{tableName}/outputs/graph") + @Consumes(Servlets.JSON_MEDIA_TYPE) + @Produces(Servlets.JSON_MEDIA_TYPE) + public Response outputsGraph(@Context HttpServletRequest request, @PathParam("tableName") String tableName) { + LOG.info("Fetching lineage outputs graph for tableName={}", tableName); + + try { + final String jsonResult = lineageService.getOutputsGraph(tableName); + + JSONObject response = new JSONObject(); + response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId()); + response.put("tableName", tableName); + response.put(AtlasClient.RESULTS, new JSONObject(jsonResult)); + + return Response.ok(response).build(); + } catch (EntityNotFoundException e) { + LOG.error("table entity not found for {}", tableName, e); + throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.NOT_FOUND)); + } catch (DiscoveryException | IllegalArgumentException e) { + LOG.error("Unable to get lineage outputs graph for table {}", tableName, e); + throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST)); + } catch (Throwable e) { + LOG.error("Unable to get lineage outputs graph for table {}", tableName, e); + throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.INTERNAL_SERVER_ERROR)); + } + } + + /** + * Return the schema for the given tableName. + * + * @param tableName table name + */ + @GET + @Path("table/{tableName}/schema") + @Consumes(Servlets.JSON_MEDIA_TYPE) + @Produces(Servlets.JSON_MEDIA_TYPE) + public Response schema(@Context HttpServletRequest request, @PathParam("tableName") String tableName) { + LOG.info("Fetching schema for tableName={}", tableName); + + try { + final String jsonResult = lineageService.getSchema(tableName); + + JSONObject response = new JSONObject(); + response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId()); + response.put("tableName", tableName); + response.put(AtlasClient.RESULTS, new JSONObject(jsonResult)); + + return Response.ok(response).build(); + } catch (EntityNotFoundException e) { + LOG.error("table entity not found for {}", tableName, e); + throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.NOT_FOUND)); + } catch (DiscoveryException | IllegalArgumentException e) { + LOG.error("Unable to get schema for table {}", tableName, e); + throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST)); + } catch (Throwable e) { + LOG.error("Unable to get schema for table {}", tableName, e); + throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.INTERNAL_SERVER_ERROR)); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/webapp/src/main/java/org/apache/atlas/web/resources/HiveLineageResource.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/HiveLineageResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/HiveLineageResource.java deleted file mode 100644 index 9b3fbc9..0000000 --- a/webapp/src/main/java/org/apache/atlas/web/resources/HiveLineageResource.java +++ /dev/null @@ -1,166 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.atlas.web.resources; - -import org.apache.atlas.AtlasClient; -import org.apache.atlas.typesystem.exception.EntityNotFoundException; -import org.apache.atlas.utils.ParamChecker; -import org.apache.atlas.discovery.DiscoveryException; -import org.apache.atlas.discovery.LineageService; -import org.apache.atlas.web.util.Servlets; -import org.codehaus.jettison.json.JSONObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.inject.Inject; -import javax.inject.Singleton; -import javax.servlet.http.HttpServletRequest; -import javax.ws.rs.Consumes; -import javax.ws.rs.GET; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.Produces; -import javax.ws.rs.WebApplicationException; -import javax.ws.rs.core.Context; -import javax.ws.rs.core.Response; - -/** - * Jersey Resource for Hive Table Lineage. - */ -@Path("lineage/hive") -@Singleton -public class HiveLineageResource { - - private static final Logger LOG = LoggerFactory.getLogger(HiveLineageResource.class); - - private final LineageService lineageService; - - /** - * Created by the Guice ServletModule and injected with the - * configured LineageService. - * - * @param lineageService lineage service handle - */ - @Inject - public HiveLineageResource(LineageService lineageService) { - this.lineageService = lineageService; - } - - /** - * Returns the inputs graph for a given entity. - * - * @param tableName table name - */ - @GET - @Path("table/{tableName}/inputs/graph") - @Consumes(Servlets.JSON_MEDIA_TYPE) - @Produces(Servlets.JSON_MEDIA_TYPE) - public Response inputsGraph(@Context HttpServletRequest request, @PathParam("tableName") String tableName) { - LOG.info("Fetching lineage inputs graph for tableName={}", tableName); - - try { - ParamChecker.notEmpty(tableName, "table name cannot be null"); - final String jsonResult = lineageService.getInputsGraph(tableName); - - JSONObject response = new JSONObject(); - response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId()); - response.put("tableName", tableName); - response.put(AtlasClient.RESULTS, new JSONObject(jsonResult)); - - return Response.ok(response).build(); - } catch (EntityNotFoundException e) { - LOG.error("table entity not found for {}", tableName, e); - throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.NOT_FOUND)); - } catch (DiscoveryException | IllegalArgumentException e) { - LOG.error("Unable to get lineage inputs graph for table {}", tableName, e); - throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST)); - } catch (Throwable e) { - LOG.error("Unable to get lineage inputs graph for table {}", tableName, e); - throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.INTERNAL_SERVER_ERROR)); - } - } - - /** - * Returns the outputs graph for a given entity. - * - * @param tableName table name - */ - @GET - @Path("table/{tableName}/outputs/graph") - @Consumes(Servlets.JSON_MEDIA_TYPE) - @Produces(Servlets.JSON_MEDIA_TYPE) - public Response outputsGraph(@Context HttpServletRequest request, @PathParam("tableName") String tableName) { - LOG.info("Fetching lineage outputs graph for tableName={}", tableName); - - try { - ParamChecker.notEmpty(tableName, "table name cannot be null"); - final String jsonResult = lineageService.getOutputsGraph(tableName); - - JSONObject response = new JSONObject(); - response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId()); - response.put("tableName", tableName); - response.put(AtlasClient.RESULTS, new JSONObject(jsonResult)); - - return Response.ok(response).build(); - } catch (EntityNotFoundException e) { - LOG.error("table entity not found for {}", tableName, e); - throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.NOT_FOUND)); - } catch (DiscoveryException | IllegalArgumentException e) { - LOG.error("Unable to get lineage outputs graph for table {}", tableName, e); - throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST)); - } catch (Throwable e) { - LOG.error("Unable to get lineage outputs graph for table {}", tableName, e); - throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.INTERNAL_SERVER_ERROR)); - } - } - - /** - * Return the schema for the given tableName. - * - * @param tableName table name - */ - @GET - @Path("table/{tableName}/schema") - @Consumes(Servlets.JSON_MEDIA_TYPE) - @Produces(Servlets.JSON_MEDIA_TYPE) - public Response schema(@Context HttpServletRequest request, @PathParam("tableName") String tableName) { - LOG.info("Fetching schema for tableName={}", tableName); - - try { - ParamChecker.notEmpty(tableName, "table name cannot be null"); - final String jsonResult = lineageService.getSchema(tableName); - - JSONObject response = new JSONObject(); - response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId()); - response.put("tableName", tableName); - response.put(AtlasClient.RESULTS, new JSONObject(jsonResult)); - - return Response.ok(response).build(); - } catch (EntityNotFoundException e) { - LOG.error("table entity not found for {}", tableName, e); - throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.NOT_FOUND)); - } catch (DiscoveryException | IllegalArgumentException e) { - LOG.error("Unable to get schema for table {}", tableName, e); - throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST)); - } catch (Throwable e) { - LOG.error("Unable to get schema for table {}", tableName, e); - throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.INTERNAL_SERVER_ERROR)); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/webapp/src/main/java/org/apache/atlas/web/resources/LineageResource.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/LineageResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/LineageResource.java new file mode 100644 index 0000000..7c92c33 --- /dev/null +++ b/webapp/src/main/java/org/apache/atlas/web/resources/LineageResource.java @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.web.resources; + +import org.apache.atlas.AtlasClient; +import org.apache.atlas.discovery.DiscoveryException; +import org.apache.atlas.discovery.LineageService; +import org.apache.atlas.typesystem.exception.EntityNotFoundException; +import org.apache.atlas.web.util.Servlets; +import org.codehaus.jettison.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.inject.Inject; +import javax.inject.Singleton; +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.Response; + +@Path("lineage") +@Singleton +public class LineageResource { + private static final Logger LOG = LoggerFactory.getLogger(DataSetLineageResource.class); + + private final LineageService lineageService; + + /** + * Created by the Guice ServletModule and injected with the + * configured LineageService. + * + * @param lineageService lineage service handle + */ + @Inject + public LineageResource(LineageService lineageService) { + this.lineageService = lineageService; + } + + /** + * Returns input lineage graph for the given entity id. + * @param guid dataset entity id + * @return + */ + @GET + @Path("{guid}/inputs/graph") + @Consumes(Servlets.JSON_MEDIA_TYPE) + @Produces(Servlets.JSON_MEDIA_TYPE) + public Response inputsGraph(@PathParam("guid") String guid) { + LOG.info("Fetching lineage inputs graph for guid={}", guid); + + try { + final String jsonResult = lineageService.getInputsGraphForEntity(guid); + + JSONObject response = new JSONObject(); + response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId()); + response.put(AtlasClient.RESULTS, new JSONObject(jsonResult)); + + return Response.ok(response).build(); + } catch (EntityNotFoundException e) { + LOG.error("entity not found for guid={}", guid, e); + throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.NOT_FOUND)); + } catch (DiscoveryException | IllegalArgumentException e) { + LOG.error("Unable to get lineage inputs graph for entity guid={}", guid, e); + throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST)); + } catch (Throwable e) { + LOG.error("Unable to get lineage inputs graph for entity guid={}", guid, e); + throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.INTERNAL_SERVER_ERROR)); + } + } + + /** + * Returns the outputs graph for a given entity id. + * + * @param guid dataset entity id + */ + @GET + @Path("{guid}/outputs/graph") + @Consumes(Servlets.JSON_MEDIA_TYPE) + @Produces(Servlets.JSON_MEDIA_TYPE) + public Response outputsGraph(@PathParam("guid") String guid) { + LOG.info("Fetching lineage outputs graph for entity guid={}", guid); + + try { + final String jsonResult = lineageService.getOutputsGraphForEntity(guid); + + JSONObject response = new JSONObject(); + response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId()); + response.put(AtlasClient.RESULTS, new JSONObject(jsonResult)); + + return Response.ok(response).build(); + } catch (EntityNotFoundException e) { + LOG.error("table entity not found for {}", guid, e); + throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.NOT_FOUND)); + } catch (DiscoveryException | IllegalArgumentException e) { + LOG.error("Unable to get lineage outputs graph for entity guid={}", guid, e); + throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST)); + } catch (Throwable e) { + LOG.error("Unable to get lineage outputs graph for entity guid={}", guid, e); + throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.INTERNAL_SERVER_ERROR)); + } + } + + /** + * Returns the schema for the given dataset id. + * + * @param guid dataset entity id + */ + @GET + @Path("{guid}/schema") + @Consumes(Servlets.JSON_MEDIA_TYPE) + @Produces(Servlets.JSON_MEDIA_TYPE) + public Response schema(@PathParam("guid") String guid) { + LOG.info("Fetching schema for entity guid={}", guid); + + try { + final String jsonResult = lineageService.getSchemaForEntity(guid); + + JSONObject response = new JSONObject(); + response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId()); + response.put(AtlasClient.RESULTS, new JSONObject(jsonResult)); + + return Response.ok(response).build(); + } catch (EntityNotFoundException e) { + LOG.error("table entity not found for {}", guid, e); + throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.NOT_FOUND)); + } catch (DiscoveryException | IllegalArgumentException e) { + LOG.error("Unable to get schema for entity guid={}", guid, e); + throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST)); + } catch (Throwable e) { + LOG.error("Unable to get schema for entity={}", guid, e); + throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.INTERNAL_SERVER_ERROR)); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/webapp/src/test/java/org/apache/atlas/web/resources/DataSetLineageJerseyResourceIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/web/resources/DataSetLineageJerseyResourceIT.java b/webapp/src/test/java/org/apache/atlas/web/resources/DataSetLineageJerseyResourceIT.java new file mode 100644 index 0000000..41f7c31 --- /dev/null +++ b/webapp/src/test/java/org/apache/atlas/web/resources/DataSetLineageJerseyResourceIT.java @@ -0,0 +1,306 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.web.resources; + +import com.google.common.collect.ImmutableList; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.WebResource; +import org.apache.atlas.AtlasClient; +import org.apache.atlas.typesystem.Referenceable; +import org.apache.atlas.typesystem.persistence.Id; +import org.apache.atlas.web.util.Servlets; +import org.codehaus.jettison.json.JSONArray; +import org.codehaus.jettison.json.JSONObject; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import javax.ws.rs.HttpMethod; +import javax.ws.rs.core.Response; +import java.util.List; + +/** + * Hive Lineage Integration Tests. + */ +public class DataSetLineageJerseyResourceIT extends BaseResourceIT { + + private static final String BASE_URI = "api/atlas/lineage/hive/table/"; + private String salesFactTable; + private String salesMonthlyTable; + + @BeforeClass + public void setUp() throws Exception { + super.setUp(); + + createTypeDefinitions(); + setupInstances(); + } + + @Test + public void testInputsGraph() throws Exception { + WebResource resource = service.path(BASE_URI).path(salesMonthlyTable).path("inputs").path("graph"); + + ClientResponse clientResponse = resource.accept(Servlets.JSON_MEDIA_TYPE).type(Servlets.JSON_MEDIA_TYPE) + .method(HttpMethod.GET, ClientResponse.class); + Assert.assertEquals(clientResponse.getStatus(), Response.Status.OK.getStatusCode()); + + String responseAsString = clientResponse.getEntity(String.class); + Assert.assertNotNull(responseAsString); + System.out.println("inputs graph = " + responseAsString); + + JSONObject response = new JSONObject(responseAsString); + Assert.assertNotNull(response.get(AtlasClient.REQUEST_ID)); + + JSONObject results = response.getJSONObject(AtlasClient.RESULTS); + Assert.assertNotNull(results); + + JSONObject values = results.getJSONObject("values"); + Assert.assertNotNull(values); + + final JSONObject vertices = values.getJSONObject("vertices"); + Assert.assertEquals(vertices.length(), 4); + + final JSONObject edges = values.getJSONObject("edges"); + Assert.assertEquals(edges.length(), 4); + } + + @Test + public void testInputsGraphForEntity() throws Exception { + String tableId = serviceClient.getEntity(HIVE_TABLE_TYPE, "name", salesMonthlyTable).getId()._getId(); + JSONObject results = serviceClient.getInputGraphForEntity(tableId); + Assert.assertNotNull(results); + + JSONObject values = results.getJSONObject("values"); + Assert.assertNotNull(values); + + final JSONObject vertices = values.getJSONObject("vertices"); + Assert.assertEquals(vertices.length(), 4); + + final JSONObject edges = values.getJSONObject("edges"); + Assert.assertEquals(edges.length(), 4); + } + + @Test + public void testOutputsGraph() throws Exception { + WebResource resource = service.path(BASE_URI).path(salesFactTable).path("outputs").path("graph"); + + ClientResponse clientResponse = resource.accept(Servlets.JSON_MEDIA_TYPE).type(Servlets.JSON_MEDIA_TYPE) + .method(HttpMethod.GET, ClientResponse.class); + Assert.assertEquals(clientResponse.getStatus(), Response.Status.OK.getStatusCode()); + + String responseAsString = clientResponse.getEntity(String.class); + Assert.assertNotNull(responseAsString); + System.out.println("outputs graph= " + responseAsString); + + JSONObject response = new JSONObject(responseAsString); + Assert.assertNotNull(response.get(AtlasClient.REQUEST_ID)); + + JSONObject results = response.getJSONObject(AtlasClient.RESULTS); + Assert.assertNotNull(results); + + JSONObject values = results.getJSONObject("values"); + Assert.assertNotNull(values); + + final JSONObject vertices = values.getJSONObject("vertices"); + Assert.assertEquals(vertices.length(), 3); + + final JSONObject edges = values.getJSONObject("edges"); + Assert.assertEquals(edges.length(), 4); + } + + @Test + public void testOutputsGraphForEntity() throws Exception { + String tableId = serviceClient.getEntity(HIVE_TABLE_TYPE, "name", salesFactTable).getId()._getId(); + JSONObject results = serviceClient.getOutputGraphForEntity(tableId); + Assert.assertNotNull(results); + + JSONObject values = results.getJSONObject("values"); + Assert.assertNotNull(values); + + final JSONObject vertices = values.getJSONObject("vertices"); + Assert.assertEquals(vertices.length(), 3); + + final JSONObject edges = values.getJSONObject("edges"); + Assert.assertEquals(edges.length(), 4); + } + + @Test + public void testSchema() throws Exception { + WebResource resource = service.path(BASE_URI).path(salesFactTable).path("schema"); + + ClientResponse clientResponse = resource.accept(Servlets.JSON_MEDIA_TYPE).type(Servlets.JSON_MEDIA_TYPE) + .method(HttpMethod.GET, ClientResponse.class); + Assert.assertEquals(clientResponse.getStatus(), Response.Status.OK.getStatusCode()); + + String responseAsString = clientResponse.getEntity(String.class); + Assert.assertNotNull(responseAsString); + System.out.println("schema = " + responseAsString); + + JSONObject response = new JSONObject(responseAsString); + Assert.assertNotNull(response.get(AtlasClient.REQUEST_ID)); + + JSONObject results = response.getJSONObject(AtlasClient.RESULTS); + Assert.assertNotNull(results); + + JSONArray rows = results.getJSONArray("rows"); + Assert.assertEquals(rows.length(), 4); + + for (int index = 0; index < rows.length(); index++) { + final JSONObject row = rows.getJSONObject(index); + Assert.assertNotNull(row.getString("name")); + Assert.assertNotNull(row.getString("comment")); + Assert.assertNotNull(row.getString("dataType")); + Assert.assertEquals(row.getString("$typeName$"), "hive_column"); + } + } + + @Test + public void testSchemaForEntity() throws Exception { + String tableId = serviceClient.getEntity(HIVE_TABLE_TYPE, "name", salesFactTable).getId()._getId(); + JSONObject results = serviceClient.getSchemaForEntity(tableId); + Assert.assertNotNull(results); + + JSONArray rows = results.getJSONArray("rows"); + Assert.assertEquals(rows.length(), 4); + + for (int index = 0; index < rows.length(); index++) { + final JSONObject row = rows.getJSONObject(index); + Assert.assertNotNull(row.getString("name")); + Assert.assertNotNull(row.getString("comment")); + Assert.assertNotNull(row.getString("dataType")); + Assert.assertEquals(row.getString("$typeName$"), "hive_column"); + } + } + + @Test + public void testSchemaForEmptyTable() throws Exception { + WebResource resource = service.path(BASE_URI).path("").path("schema"); + + ClientResponse clientResponse = resource.accept(Servlets.JSON_MEDIA_TYPE).type(Servlets.JSON_MEDIA_TYPE) + .method(HttpMethod.GET, ClientResponse.class); + Assert.assertEquals(clientResponse.getStatus(), Response.Status.NOT_FOUND.getStatusCode()); + } + + @Test + public void testSchemaForInvalidTable() throws Exception { + WebResource resource = service.path(BASE_URI).path("blah").path("schema"); + + ClientResponse clientResponse = resource.accept(Servlets.JSON_MEDIA_TYPE).type(Servlets.JSON_MEDIA_TYPE) + .method(HttpMethod.GET, ClientResponse.class); + Assert.assertEquals(clientResponse.getStatus(), Response.Status.NOT_FOUND.getStatusCode()); + } + + private void setupInstances() throws Exception { + Id salesDB = database("Sales" + randomString(), "Sales Database", "John ETL", + "hdfs://host:8000/apps/warehouse/sales"); + + List<Referenceable> salesFactColumns = ImmutableList + .of(column("time_id", "int", "time id"), column("product_id", "int", "product id"), + column("customer_id", "int", "customer id", "pii"), + column("sales", "double", "product id", "Metric")); + + salesFactTable = "sales_fact" + randomString(); + Id salesFact = table(salesFactTable, "sales fact table", salesDB, "Joe", "MANAGED", salesFactColumns, "Fact"); + + List<Referenceable> timeDimColumns = ImmutableList + .of(column("time_id", "int", "time id"), column("dayOfYear", "int", "day Of Year"), + column("weekDay", "int", "week Day")); + + Id timeDim = + table("time_dim" + randomString(), "time dimension table", salesDB, "John Doe", "EXTERNAL", + timeDimColumns, "Dimension"); + + Id reportingDB = + database("Reporting" + randomString(), "reporting database", "Jane BI", + "hdfs://host:8000/apps/warehouse/reporting"); + + Id salesFactDaily = + table("sales_fact_daily_mv" + randomString(), "sales fact daily materialized view", reportingDB, + "Joe BI", "MANAGED", salesFactColumns, "Metric"); + + loadProcess("loadSalesDaily" + randomString(), "John ETL", ImmutableList.of(salesFact, timeDim), + ImmutableList.of(salesFactDaily), "create table as select ", "plan", "id", "graph", "ETL"); + + salesMonthlyTable = "sales_fact_monthly_mv" + randomString(); + Id salesFactMonthly = + table(salesMonthlyTable, "sales fact monthly materialized view", reportingDB, "Jane BI", + "MANAGED", salesFactColumns, "Metric"); + + loadProcess("loadSalesMonthly" + randomString(), "John ETL", ImmutableList.of(salesFactDaily), + ImmutableList.of(salesFactMonthly), "create table as select ", "plan", "id", "graph", "ETL"); + } + + Id database(String name, String description, String owner, String locationUri, String... traitNames) + throws Exception { + Referenceable referenceable = new Referenceable(DATABASE_TYPE, traitNames); + referenceable.set("name", name); + referenceable.set("description", description); + referenceable.set("owner", owner); + referenceable.set("locationUri", locationUri); + referenceable.set("createTime", System.currentTimeMillis()); + + return createInstance(referenceable); + } + + Referenceable column(String name, String dataType, String comment, String... traitNames) throws Exception { + Referenceable referenceable = new Referenceable(COLUMN_TYPE, traitNames); + referenceable.set("name", name); + referenceable.set("dataType", dataType); + referenceable.set("comment", comment); + + return referenceable; + } + + Id table(String name, String description, Id dbId, String owner, String tableType, List<Referenceable> columns, + String... traitNames) throws Exception { + Referenceable referenceable = new Referenceable(HIVE_TABLE_TYPE, traitNames); + referenceable.set("name", name); + referenceable.set("description", description); + referenceable.set("owner", owner); + referenceable.set("tableType", tableType); + referenceable.set("createTime", System.currentTimeMillis()); + referenceable.set("lastAccessTime", System.currentTimeMillis()); + referenceable.set("retention", System.currentTimeMillis()); + + referenceable.set("db", dbId); + referenceable.set("columns", columns); + + return createInstance(referenceable); + } + + Id loadProcess(String name, String user, List<Id> inputTables, List<Id> outputTables, String queryText, + String queryPlan, String queryId, String queryGraph, String... traitNames) throws Exception { + Referenceable referenceable = new Referenceable(HIVE_PROCESS_TYPE, traitNames); + referenceable.set("name", name); + referenceable.set("qualifiedName", name); + referenceable.set("user", user); + referenceable.set("startTime", System.currentTimeMillis()); + referenceable.set("endTime", System.currentTimeMillis() + 10000); + + referenceable.set("inputs", inputTables); + referenceable.set("outputs", outputTables); + + referenceable.set("queryText", queryText); + referenceable.set("queryPlan", queryPlan); + referenceable.set("queryId", queryId); + referenceable.set("queryGraph", queryGraph); + + return createInstance(referenceable); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/webapp/src/test/java/org/apache/atlas/web/resources/HiveLineageJerseyResourceIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/web/resources/HiveLineageJerseyResourceIT.java b/webapp/src/test/java/org/apache/atlas/web/resources/HiveLineageJerseyResourceIT.java deleted file mode 100644 index 0fb5ea2..0000000 --- a/webapp/src/test/java/org/apache/atlas/web/resources/HiveLineageJerseyResourceIT.java +++ /dev/null @@ -1,257 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.atlas.web.resources; - -import com.google.common.collect.ImmutableList; -import com.sun.jersey.api.client.ClientResponse; -import com.sun.jersey.api.client.WebResource; -import org.apache.atlas.AtlasClient; -import org.apache.atlas.typesystem.Referenceable; -import org.apache.atlas.typesystem.persistence.Id; -import org.apache.atlas.web.util.Servlets; -import org.codehaus.jettison.json.JSONArray; -import org.codehaus.jettison.json.JSONObject; -import org.testng.Assert; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; - -import javax.ws.rs.HttpMethod; -import javax.ws.rs.core.Response; -import java.util.List; - -/** - * Hive Lineage Integration Tests. - */ -public class HiveLineageJerseyResourceIT extends BaseResourceIT { - - private static final String BASE_URI = "api/atlas/lineage/hive/table/"; - private String salesFactTable; - private String salesMonthlyTable; - - @BeforeClass - public void setUp() throws Exception { - super.setUp(); - - createTypeDefinitions(); - setupInstances(); - } - - @Test - public void testInputsGraph() throws Exception { - WebResource resource = service.path(BASE_URI).path(salesMonthlyTable).path("inputs").path("graph"); - - ClientResponse clientResponse = resource.accept(Servlets.JSON_MEDIA_TYPE).type(Servlets.JSON_MEDIA_TYPE) - .method(HttpMethod.GET, ClientResponse.class); - Assert.assertEquals(clientResponse.getStatus(), Response.Status.OK.getStatusCode()); - - String responseAsString = clientResponse.getEntity(String.class); - Assert.assertNotNull(responseAsString); - System.out.println("inputs graph = " + responseAsString); - - JSONObject response = new JSONObject(responseAsString); - Assert.assertNotNull(response.get(AtlasClient.REQUEST_ID)); - - JSONObject results = response.getJSONObject(AtlasClient.RESULTS); - Assert.assertNotNull(results); - - JSONObject values = results.getJSONObject("values"); - Assert.assertNotNull(values); - - final JSONObject vertices = values.getJSONObject("vertices"); - Assert.assertEquals(vertices.length(), 4); - - final JSONObject edges = values.getJSONObject("edges"); - Assert.assertEquals(edges.length(), 4); - } - - @Test - public void testOutputsGraph() throws Exception { - WebResource resource = service.path(BASE_URI).path(salesFactTable).path("outputs").path("graph"); - - ClientResponse clientResponse = resource.accept(Servlets.JSON_MEDIA_TYPE).type(Servlets.JSON_MEDIA_TYPE) - .method(HttpMethod.GET, ClientResponse.class); - Assert.assertEquals(clientResponse.getStatus(), Response.Status.OK.getStatusCode()); - - String responseAsString = clientResponse.getEntity(String.class); - Assert.assertNotNull(responseAsString); - System.out.println("outputs graph= " + responseAsString); - - JSONObject response = new JSONObject(responseAsString); - Assert.assertNotNull(response.get(AtlasClient.REQUEST_ID)); - - JSONObject results = response.getJSONObject(AtlasClient.RESULTS); - Assert.assertNotNull(results); - - JSONObject values = results.getJSONObject("values"); - Assert.assertNotNull(values); - - final JSONObject vertices = values.getJSONObject("vertices"); - Assert.assertEquals(vertices.length(), 3); - - final JSONObject edges = values.getJSONObject("edges"); - Assert.assertEquals(edges.length(), 4); - } - - @Test - public void testSchema() throws Exception { - WebResource resource = service.path(BASE_URI).path(salesFactTable).path("schema"); - - ClientResponse clientResponse = resource.accept(Servlets.JSON_MEDIA_TYPE).type(Servlets.JSON_MEDIA_TYPE) - .method(HttpMethod.GET, ClientResponse.class); - Assert.assertEquals(clientResponse.getStatus(), Response.Status.OK.getStatusCode()); - - String responseAsString = clientResponse.getEntity(String.class); - Assert.assertNotNull(responseAsString); - System.out.println("schema = " + responseAsString); - - JSONObject response = new JSONObject(responseAsString); - Assert.assertNotNull(response.get(AtlasClient.REQUEST_ID)); - - JSONObject results = response.getJSONObject(AtlasClient.RESULTS); - Assert.assertNotNull(results); - - JSONArray rows = results.getJSONArray("rows"); - Assert.assertEquals(rows.length(), 4); - - for (int index = 0; index < rows.length(); index++) { - final JSONObject row = rows.getJSONObject(index); - Assert.assertNotNull(row.getString("name")); - Assert.assertNotNull(row.getString("comment")); - Assert.assertNotNull(row.getString("dataType")); - Assert.assertEquals(row.getString("$typeName$"), "hive_column"); - } - } - - @Test - public void testSchemaForEmptyTable() throws Exception { - WebResource resource = service.path(BASE_URI).path("").path("schema"); - - ClientResponse clientResponse = resource.accept(Servlets.JSON_MEDIA_TYPE).type(Servlets.JSON_MEDIA_TYPE) - .method(HttpMethod.GET, ClientResponse.class); - Assert.assertEquals(clientResponse.getStatus(), Response.Status.NOT_FOUND.getStatusCode()); - } - - @Test - public void testSchemaForInvalidTable() throws Exception { - WebResource resource = service.path(BASE_URI).path("blah").path("schema"); - - ClientResponse clientResponse = resource.accept(Servlets.JSON_MEDIA_TYPE).type(Servlets.JSON_MEDIA_TYPE) - .method(HttpMethod.GET, ClientResponse.class); - Assert.assertEquals(clientResponse.getStatus(), Response.Status.NOT_FOUND.getStatusCode()); - } - - private void setupInstances() throws Exception { - Id salesDB = database("Sales" + randomString(), "Sales Database", "John ETL", - "hdfs://host:8000/apps/warehouse/sales"); - - List<Referenceable> salesFactColumns = ImmutableList - .of(column("time_id", "int", "time id"), column("product_id", "int", "product id"), - column("customer_id", "int", "customer id", "pii"), - column("sales", "double", "product id", "Metric")); - - salesFactTable = "sales_fact" + randomString(); - Id salesFact = table(salesFactTable, "sales fact table", salesDB, "Joe", "MANAGED", salesFactColumns, "Fact"); - - List<Referenceable> timeDimColumns = ImmutableList - .of(column("time_id", "int", "time id"), column("dayOfYear", "int", "day Of Year"), - column("weekDay", "int", "week Day")); - - Id timeDim = - table("time_dim" + randomString(), "time dimension table", salesDB, "John Doe", "EXTERNAL", - timeDimColumns, "Dimension"); - - Id reportingDB = - database("Reporting" + randomString(), "reporting database", "Jane BI", - "hdfs://host:8000/apps/warehouse/reporting"); - - Id salesFactDaily = - table("sales_fact_daily_mv" + randomString(), "sales fact daily materialized view", reportingDB, - "Joe BI", "MANAGED", salesFactColumns, "Metric"); - - String procName = "loadSalesDaily" + randomString(); - loadProcess(procName, "John ETL", ImmutableList.of(salesFact, timeDim), - ImmutableList.of(salesFactDaily), "create table as select ", "plan", "id", "graph", "ETL"); - - salesMonthlyTable = "sales_fact_monthly_mv" + randomString(); - Id salesFactMonthly = - table(salesMonthlyTable, "sales fact monthly materialized view", reportingDB, "Jane BI", - "MANAGED", salesFactColumns, "Metric"); - - loadProcess("loadSalesMonthly" + randomString(), "John ETL", ImmutableList.of(salesFactDaily), - ImmutableList.of(salesFactMonthly), "create table as select ", "plan", "id", "graph", "ETL"); - } - - Id database(String name, String description, String owner, String locationUri, String... traitNames) - throws Exception { - Referenceable referenceable = new Referenceable(DATABASE_TYPE, traitNames); - referenceable.set("name", name); - referenceable.set("description", description); - referenceable.set("owner", owner); - referenceable.set("locationUri", locationUri); - referenceable.set("createTime", System.currentTimeMillis()); - - return createInstance(referenceable); - } - - Referenceable column(String name, String dataType, String comment, String... traitNames) throws Exception { - Referenceable referenceable = new Referenceable(COLUMN_TYPE, traitNames); - referenceable.set("name", name); - referenceable.set("dataType", dataType); - referenceable.set("comment", comment); - - return referenceable; - } - - Id table(String name, String description, Id dbId, String owner, String tableType, List<Referenceable> columns, - String... traitNames) throws Exception { - Referenceable referenceable = new Referenceable(HIVE_TABLE_TYPE, traitNames); - referenceable.set("name", name); - referenceable.set("description", description); - referenceable.set("owner", owner); - referenceable.set("tableType", tableType); - referenceable.set("createTime", System.currentTimeMillis()); - referenceable.set("lastAccessTime", System.currentTimeMillis()); - referenceable.set("retention", System.currentTimeMillis()); - - referenceable.set("db", dbId); - referenceable.set("columns", columns); - - return createInstance(referenceable); - } - - Id loadProcess(String name, String user, List<Id> inputTables, List<Id> outputTables, String queryText, - String queryPlan, String queryId, String queryGraph, String... traitNames) throws Exception { - Referenceable referenceable = new Referenceable(HIVE_PROCESS_TYPE, traitNames); - referenceable.set(AtlasClient.NAME, name); - referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name); - referenceable.set("user", user); - referenceable.set("startTime", System.currentTimeMillis()); - referenceable.set("endTime", System.currentTimeMillis() + 10000); - - referenceable.set("inputs", inputTables); - referenceable.set("outputs", outputTables); - - referenceable.set("queryText", queryText); - referenceable.set("queryPlan", queryPlan); - referenceable.set("queryId", queryId); - referenceable.set("queryGraph", queryGraph); - - return createInstance(referenceable); - } -}
