ATLAS-713 Entity lineage based on entity id (shwethags)
Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/b65dd91c Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/b65dd91c Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/b65dd91c Branch: refs/heads/master Commit: b65dd91c3587d35abafc4ec136e162f9a5c92ac1 Parents: 857561a Author: Shwetha GS <[email protected]> Authored: Wed May 18 17:55:24 2016 +0530 Committer: Shwetha GS <[email protected]> Committed: Wed May 18 17:55:24 2016 +0530 ---------------------------------------------------------------------- .../main/java/org/apache/atlas/AtlasClient.java | 41 +- dashboardv2/public/js/models/VLineage.js | 4 +- dashboardv2/public/js/models/VSchema.js | 4 +- .../views/detail_page/DetailPageLayoutView.js | 8 +- .../public/js/views/graph/LineageLayoutView.js | 4 +- .../public/js/views/schema/SchemaLayoutView.js | 2 +- distro/src/conf/atlas-application.properties | 10 +- release-log.txt | 1 + .../apache/atlas/RepositoryMetadataModule.java | 4 +- .../atlas/discovery/DataSetLineageService.java | 215 +++++++++ .../atlas/discovery/HiveLineageService.java | 222 --------- .../org/apache/atlas/query/ClosureQuery.scala | 44 +- .../apache/atlas/BaseHiveRepositoryTest.java | 377 ---------------- .../org/apache/atlas/BaseRepositoryTest.java | 377 ++++++++++++++++ .../discovery/DataSetLineageServiceTest.java | 447 +++++++++++++++++++ .../GraphBackedDiscoveryServiceTest.java | 4 +- .../atlas/discovery/HiveLineageServiceTest.java | 260 ----------- .../org/apache/atlas/query/GremlinTest2.scala | 8 +- .../apache/atlas/discovery/LineageService.java | 44 +- .../main/resources/atlas-application.properties | 8 +- .../web/resources/DataSetLineageResource.java | 162 +++++++ .../web/resources/HiveLineageResource.java | 166 ------- .../atlas/web/resources/LineageResource.java | 153 +++++++ .../DataSetLineageJerseyResourceIT.java | 306 +++++++++++++ .../resources/HiveLineageJerseyResourceIT.java | 257 ----------- 25 files changed, 1768 insertions(+), 1360 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/client/src/main/java/org/apache/atlas/AtlasClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/atlas/AtlasClient.java b/client/src/main/java/org/apache/atlas/AtlasClient.java index b3ec95c..7e32cc2 100755 --- a/client/src/main/java/org/apache/atlas/AtlasClient.java +++ b/client/src/main/java/org/apache/atlas/AtlasClient.java @@ -90,7 +90,8 @@ public class AtlasClient { public static final String URI_ENTITY = "entities"; public static final String URI_ENTITY_AUDIT = "audit"; public static final String URI_SEARCH = "discovery/search"; - public static final String URI_LINEAGE = "lineage/hive/table"; + public static final String URI_NAME_LINEAGE = "lineage/hive/table"; + public static final String URI_LINEAGE = "lineage/"; public static final String URI_TRAITS = "traits"; public static final String QUERY = "query"; @@ -416,7 +417,12 @@ public class AtlasClient { SEARCH_GREMLIN(BASE_URI + URI_SEARCH + "/gremlin", HttpMethod.GET, Response.Status.OK), SEARCH_FULL_TEXT(BASE_URI + URI_SEARCH + "/fulltext", HttpMethod.GET, Response.Status.OK), - //Lineage operations + //Lineage operations based on dataset name + NAME_LINEAGE_INPUTS_GRAPH(BASE_URI + URI_NAME_LINEAGE, HttpMethod.GET, Response.Status.OK), + NAME_LINEAGE_OUTPUTS_GRAPH(BASE_URI + URI_NAME_LINEAGE, HttpMethod.GET, Response.Status.OK), + NAME_LINEAGE_SCHEMA(BASE_URI + URI_NAME_LINEAGE, HttpMethod.GET, Response.Status.OK), + + //Lineage operations based on entity id of the dataset LINEAGE_INPUTS_GRAPH(BASE_URI + URI_LINEAGE, HttpMethod.GET, Response.Status.OK), LINEAGE_OUTPUTS_GRAPH(BASE_URI + URI_LINEAGE, HttpMethod.GET, Response.Status.OK), LINEAGE_SCHEMA(BASE_URI + URI_LINEAGE, HttpMethod.GET, Response.Status.OK); @@ -988,7 +994,7 @@ public class AtlasClient { } public JSONObject getInputGraph(String datasetName) throws AtlasServiceException { - JSONObject response = callAPI(API.LINEAGE_INPUTS_GRAPH, null, datasetName, "/inputs/graph"); + JSONObject response = callAPI(API.NAME_LINEAGE_INPUTS_GRAPH, null, datasetName, "/inputs/graph"); try { return response.getJSONObject(AtlasClient.RESULTS); } catch (JSONException e) { @@ -997,7 +1003,34 @@ public class AtlasClient { } public JSONObject getOutputGraph(String datasetName) throws AtlasServiceException { - JSONObject response = callAPI(API.LINEAGE_OUTPUTS_GRAPH, null, datasetName, "/outputs/graph"); + JSONObject response = callAPI(API.NAME_LINEAGE_OUTPUTS_GRAPH, null, datasetName, "/outputs/graph"); + try { + return response.getJSONObject(AtlasClient.RESULTS); + } catch (JSONException e) { + throw new AtlasServiceException(e); + } + } + + public JSONObject getInputGraphForEntity(String entityId) throws AtlasServiceException { + JSONObject response = callAPI(API.LINEAGE_INPUTS_GRAPH, null, entityId, "/inputs/graph"); + try { + return response.getJSONObject(AtlasClient.RESULTS); + } catch (JSONException e) { + throw new AtlasServiceException(e); + } + } + + public JSONObject getOutputGraphForEntity(String datasetId) throws AtlasServiceException { + JSONObject response = callAPI(API.LINEAGE_OUTPUTS_GRAPH, null, datasetId, "/outputs/graph"); + try { + return response.getJSONObject(AtlasClient.RESULTS); + } catch (JSONException e) { + throw new AtlasServiceException(e); + } + } + + public JSONObject getSchemaForEntity(String datasetId) throws AtlasServiceException { + JSONObject response = callAPI(API.LINEAGE_OUTPUTS_GRAPH, null, datasetId, "/schema"); try { return response.getJSONObject(AtlasClient.RESULTS); } catch (JSONException e) { http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/dashboardv2/public/js/models/VLineage.js ---------------------------------------------------------------------- diff --git a/dashboardv2/public/js/models/VLineage.js b/dashboardv2/public/js/models/VLineage.js index e33488a..fa1be05 100644 --- a/dashboardv2/public/js/models/VLineage.js +++ b/dashboardv2/public/js/models/VLineage.js @@ -23,7 +23,7 @@ define(['require', 'use strict'; var VLineage = VBaseModel.extend({ - urlRoot: Globals.baseURL + 'api/atlas/lineage/hive/table/assetName/outputs/graph', + urlRoot: Globals.baseURL + 'api/atlas/lineage/assetName/outputs/graph', defaults: {}, @@ -36,7 +36,7 @@ define(['require', this.bindErrorEvents(); }, toString: function() { - return this.get('name'); + return this.get('id'); }, }, {}); return VLineage; http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/dashboardv2/public/js/models/VSchema.js ---------------------------------------------------------------------- diff --git a/dashboardv2/public/js/models/VSchema.js b/dashboardv2/public/js/models/VSchema.js index 1f8e0bb..24462e6 100644 --- a/dashboardv2/public/js/models/VSchema.js +++ b/dashboardv2/public/js/models/VSchema.js @@ -22,7 +22,7 @@ define(['require', ], function(require, Globals, VBaseModel) { 'use strict'; var VSchema = VBaseModel.extend({ - urlRoot: Globals.baseURL + '/api/atlas/lineage/hive/table/log_fact_daily_mv/schema', + urlRoot: Globals.baseURL + '/api/atlas/lineage/log_fact_daily_mv/schema', defaults: {}, @@ -35,7 +35,7 @@ define(['require', this.bindErrorEvents(); }, toString: function() { - return this.get('name'); + return this.get('id'); }, }, {}); return VSchema; http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/dashboardv2/public/js/views/detail_page/DetailPageLayoutView.js ---------------------------------------------------------------------- diff --git a/dashboardv2/public/js/views/detail_page/DetailPageLayoutView.js b/dashboardv2/public/js/views/detail_page/DetailPageLayoutView.js index 87adec0..0932208 100644 --- a/dashboardv2/public/js/views/detail_page/DetailPageLayoutView.js +++ b/dashboardv2/public/js/views/detail_page/DetailPageLayoutView.js @@ -92,7 +92,7 @@ define(['require', this.renderEntityDetailTableLayoutView(); this.renderTagTableLayoutView(tagGuid); this.renderLineageLayoutView(tagGuid); - this.renderSchemaLayoutView(); + this.renderSchemaLayoutView(tagGuid); }, this); }, onRender: function() {}, @@ -120,17 +120,17 @@ define(['require', require(['views/graph/LineageLayoutView'], function(LineageLayoutView) { that.RLineageLayoutView.show(new LineageLayoutView({ globalVent: that.globalVent, - assetName: that.name, + assetName: tagGuid, guid: tagGuid })); }); }, - renderSchemaLayoutView: function() { + renderSchemaLayoutView: function(tagGuid) { var that = this; require(['views/schema/SchemaLayoutView'], function(SchemaLayoutView) { that.RSchemaTableLayoutView.show(new SchemaLayoutView({ globalVent: that.globalVent, - name: that.name, + name: tagGuid, vent: that.vent })); }); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/dashboardv2/public/js/views/graph/LineageLayoutView.js ---------------------------------------------------------------------- diff --git a/dashboardv2/public/js/views/graph/LineageLayoutView.js b/dashboardv2/public/js/views/graph/LineageLayoutView.js index 973d091..31433c1 100644 --- a/dashboardv2/public/js/views/graph/LineageLayoutView.js +++ b/dashboardv2/public/js/views/graph/LineageLayoutView.js @@ -56,8 +56,8 @@ define(['require', this.inputCollection = new VLineageList(); this.outputCollection = new VLineageList(); this.entityModel = new VEntity(); - this.inputCollection.url = "/api/atlas/lineage/hive/table/" + this.assetName + "/inputs/graph"; - this.outputCollection.url = "/api/atlas/lineage/hive/table/" + this.assetName + "/outputs/graph"; + this.inputCollection.url = "/api/atlas/lineage/" + this.assetName + "/inputs/graph"; + this.outputCollection.url = "/api/atlas/lineage/" + this.assetName + "/outputs/graph"; this.bindEvents(); this.fetchGraphData(); this.data = {}; http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/dashboardv2/public/js/views/schema/SchemaLayoutView.js ---------------------------------------------------------------------- diff --git a/dashboardv2/public/js/views/schema/SchemaLayoutView.js b/dashboardv2/public/js/views/schema/SchemaLayoutView.js index de558a7..301b993 100644 --- a/dashboardv2/public/js/views/schema/SchemaLayoutView.js +++ b/dashboardv2/public/js/views/schema/SchemaLayoutView.js @@ -73,7 +73,7 @@ define(['require', initialize: function(options) { _.extend(this, _.pick(options, 'globalVent', 'name', 'vent')); this.schemaCollection = new VSchemaList([], {}); - this.schemaCollection.url = "/api/atlas/lineage/hive/table/" + this.name + "/schema"; + this.schemaCollection.url = "/api/atlas/lineage/" + this.name + "/schema"; this.commonTableOptions = { collection: this.schemaCollection, includeFilter: false, http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/distro/src/conf/atlas-application.properties ---------------------------------------------------------------------- diff --git a/distro/src/conf/atlas-application.properties b/distro/src/conf/atlas-application.properties index 68a0021..d4722fb 100755 --- a/distro/src/conf/atlas-application.properties +++ b/distro/src/conf/atlas-application.properties @@ -63,15 +63,9 @@ atlas.kafka.auto.commit.enable=false ######### Hive Lineage Configs ######### -# This models reflects the base super types for Data and Process -#atlas.lineage.hive.table.type.name=DataSet -#atlas.lineage.hive.process.type.name=Process -#atlas.lineage.hive.process.inputs.name=inputs -#atlas.lineage.hive.process.outputs.name=outputs - ## Schema -atlas.lineage.hive.table.schema.query.hive_table=hive_table where name='%s'\, columns -atlas.lineage.hive.table.schema.query.Table=Table where name='%s'\, columns +atlas.lineage.schema.query.hive_table=hive_table where __guid='%s'\, columns +atlas.lineage.schema.query.Table=Table where __guid='%s'\, columns ## Server port configuration #atlas.server.http.port=21000 http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index b600fff..a68010a 100644 --- a/release-log.txt +++ b/release-log.txt @@ -21,6 +21,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags) ALL CHANGES: +ATLAS-713 Entity lineage based on entity id (shwethags) ATLAS-736 UI - BUG :: displaying timestamp values for hive_db description (kevalbhatt18 via yhemanth) ATLAS-784 Configure config.store.uri for Falcon hook IT (yhemanth) ATLAS-645 FieldMapping.output() results in stack overflow when instances reference each other (dkantor via shwethags) http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java b/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java index 8dae968..68b707f 100755 --- a/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java +++ b/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java @@ -26,7 +26,7 @@ import com.google.inject.throwingproviders.ThrowingProviderBinder; import com.thinkaurelius.titan.core.TitanGraph; import org.aopalliance.intercept.MethodInterceptor; import org.apache.atlas.discovery.DiscoveryService; -import org.apache.atlas.discovery.HiveLineageService; +import org.apache.atlas.discovery.DataSetLineageService; import org.apache.atlas.discovery.LineageService; import org.apache.atlas.discovery.graph.GraphBackedDiscoveryService; import org.apache.atlas.listener.EntityChangeListener; @@ -83,7 +83,7 @@ public class RepositoryMetadataModule extends com.google.inject.AbstractModule { // bind the DiscoveryService interface to an implementation bind(DiscoveryService.class).to(GraphBackedDiscoveryService.class).asEagerSingleton(); - bind(LineageService.class).to(HiveLineageService.class).asEagerSingleton(); + bind(LineageService.class).to(DataSetLineageService.class).asEagerSingleton(); bindAuditRepository(binder()); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/repository/src/main/java/org/apache/atlas/discovery/DataSetLineageService.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/discovery/DataSetLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/DataSetLineageService.java new file mode 100644 index 0000000..39dde2a --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/discovery/DataSetLineageService.java @@ -0,0 +1,215 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.discovery; + +import com.thinkaurelius.titan.core.TitanGraph; +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasClient; +import org.apache.atlas.AtlasException; +import org.apache.atlas.GraphTransaction; +import org.apache.atlas.discovery.graph.DefaultGraphPersistenceStrategy; +import org.apache.atlas.discovery.graph.GraphBackedDiscoveryService; +import org.apache.atlas.query.GremlinQueryResult; +import org.apache.atlas.query.InputLineageClosureQuery; +import org.apache.atlas.query.OutputLineageClosureQuery; +import org.apache.atlas.repository.MetadataRepository; +import org.apache.atlas.repository.graph.GraphProvider; +import org.apache.atlas.typesystem.exception.EntityNotFoundException; +import org.apache.atlas.typesystem.persistence.ReferenceableInstance; +import org.apache.atlas.utils.ParamChecker; +import org.apache.commons.configuration.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; +import scala.Some; +import scala.collection.immutable.List; + +import javax.inject.Inject; +import javax.inject.Singleton; + +/** + * Hive implementation of Lineage service interface. + */ +@Singleton +public class DataSetLineageService implements LineageService { + + private static final Logger LOG = LoggerFactory.getLogger(DataSetLineageService.class); + + private static final Option<List<String>> SELECT_ATTRIBUTES = + Some.<List<String>>apply(List.<String>fromArray(new String[]{"name"})); + public static final String SELECT_INSTANCE_GUID = "__guid"; + + public static final String DATASET_SCHEMA_QUERY_PREFIX = "atlas.lineage.schema.query."; + + private static final String HIVE_PROCESS_TYPE_NAME = "Process"; + private static final String HIVE_PROCESS_INPUT_ATTRIBUTE_NAME = "inputs"; + private static final String HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME = "outputs"; + + private static final String DATASET_EXISTS_QUERY = AtlasClient.DATA_SET_SUPER_TYPE + " where __guid = '%s'"; + private static final String DATASET_NAME_EXISTS_QUERY = + AtlasClient.DATA_SET_SUPER_TYPE + " where name = '%s' and __state = 'ACTIVE'"; + + private static final Configuration propertiesConf; + + static { + try { + propertiesConf = ApplicationProperties.get(); + } catch (AtlasException e) { + throw new RuntimeException(e); + } + } + + + private final TitanGraph titanGraph; + private final DefaultGraphPersistenceStrategy graphPersistenceStrategy; + private final GraphBackedDiscoveryService discoveryService; + + @Inject + DataSetLineageService(GraphProvider<TitanGraph> graphProvider, MetadataRepository metadataRepository, + GraphBackedDiscoveryService discoveryService) throws DiscoveryException { + this.titanGraph = graphProvider.get(); + this.graphPersistenceStrategy = new DefaultGraphPersistenceStrategy(metadataRepository); + this.discoveryService = discoveryService; + } + + /** + * Return the lineage outputs graph for the given datasetName. + * + * @param datasetName datasetName + * @return Outputs Graph as JSON + */ + @Override + @GraphTransaction + public String getOutputsGraph(String datasetName) throws AtlasException { + LOG.info("Fetching lineage outputs graph for datasetName={}", datasetName); + ParamChecker.notEmpty(datasetName, "dataset name"); + ReferenceableInstance datasetInstance = validateDatasetNameExists(datasetName); + return getOutputsGraphForId(datasetInstance.getId()._getId()); + } + + /** + * Return the lineage inputs graph for the given tableName. + * + * @param tableName tableName + * @return Inputs Graph as JSON + */ + @Override + @GraphTransaction + public String getInputsGraph(String tableName) throws AtlasException { + LOG.info("Fetching lineage inputs graph for tableName={}", tableName); + ParamChecker.notEmpty(tableName, "table name"); + ReferenceableInstance datasetInstance = validateDatasetNameExists(tableName); + return getInputsGraphForId(datasetInstance.getId()._getId()); + } + + @Override + public String getInputsGraphForEntity(String guid) throws AtlasException { + LOG.info("Fetching lineage inputs graph for entity={}", guid); + ParamChecker.notEmpty(guid, "Entity id"); + validateDatasetExists(guid); + return getInputsGraphForId(guid); + } + + private String getInputsGraphForId(String guid) { + InputLineageClosureQuery + inputsQuery = new InputLineageClosureQuery(AtlasClient.DATA_SET_SUPER_TYPE, SELECT_INSTANCE_GUID, + guid, HIVE_PROCESS_TYPE_NAME, + HIVE_PROCESS_INPUT_ATTRIBUTE_NAME, HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME, Option.empty(), + SELECT_ATTRIBUTES, true, graphPersistenceStrategy, titanGraph); + return inputsQuery.graph().toInstanceJson(); + } + + @Override + public String getOutputsGraphForEntity(String guid) throws AtlasException { + LOG.info("Fetching lineage outputs graph for entity guid={}", guid); + ParamChecker.notEmpty(guid, "Entity id"); + validateDatasetExists(guid); + return getOutputsGraphForId(guid); + } + + private String getOutputsGraphForId(String guid) { + OutputLineageClosureQuery outputsQuery = + new OutputLineageClosureQuery(AtlasClient.DATA_SET_SUPER_TYPE, SELECT_INSTANCE_GUID, guid, HIVE_PROCESS_TYPE_NAME, + HIVE_PROCESS_INPUT_ATTRIBUTE_NAME, HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME, Option.empty(), + SELECT_ATTRIBUTES, true, graphPersistenceStrategy, titanGraph); + return outputsQuery.graph().toInstanceJson(); + } + + /** + * Return the schema for the given tableName. + * + * @param datasetName tableName + * @return Schema as JSON + */ + @Override + @GraphTransaction + public String getSchema(String datasetName) throws AtlasException { + ParamChecker.notEmpty(datasetName, "table name"); + LOG.info("Fetching schema for tableName={}", datasetName); + ReferenceableInstance datasetInstance = validateDatasetNameExists(datasetName); + + return getSchemaForId(datasetInstance.getTypeName(), datasetInstance.getId()._getId()); + } + + private String getSchemaForId(String typeName, String guid) throws DiscoveryException { + final String schemaQuery = + String.format(propertiesConf.getString(DATASET_SCHEMA_QUERY_PREFIX + typeName), guid); + return discoveryService.searchByDSL(schemaQuery); + } + + @Override + public String getSchemaForEntity(String guid) throws AtlasException { + ParamChecker.notEmpty(guid, "Entity id"); + LOG.info("Fetching schema for entity guid={}", guid); + String typeName = validateDatasetExists(guid); + return getSchemaForId(typeName, guid); + } + + /** + * Validate if indeed this is a table type and exists. + * + * @param datasetName table name + */ + private ReferenceableInstance validateDatasetNameExists(String datasetName) throws AtlasException { + final String tableExistsQuery = String.format(DATASET_NAME_EXISTS_QUERY, datasetName); + GremlinQueryResult queryResult = discoveryService.evaluate(tableExistsQuery); + if (!(queryResult.rows().length() > 0)) { + throw new EntityNotFoundException(datasetName + " does not exist"); + } + + return (ReferenceableInstance)queryResult.rows().apply(0); + } + + /** + * Validate if indeed this is a table type and exists. + * + * @param guid entity id + */ + private String validateDatasetExists(String guid) throws AtlasException { + final String datasetExistsQuery = String.format(DATASET_EXISTS_QUERY, guid); + GremlinQueryResult queryResult = discoveryService.evaluate(datasetExistsQuery); + if (!(queryResult.rows().length() > 0)) { + throw new EntityNotFoundException("Dataset with guid = " + guid + " does not exist"); + } + + ReferenceableInstance referenceable = (ReferenceableInstance)queryResult.rows().apply(0); + return referenceable.getTypeName(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/repository/src/main/java/org/apache/atlas/discovery/HiveLineageService.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/discovery/HiveLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/HiveLineageService.java deleted file mode 100644 index 00905d7..0000000 --- a/repository/src/main/java/org/apache/atlas/discovery/HiveLineageService.java +++ /dev/null @@ -1,222 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.atlas.discovery; - -import com.thinkaurelius.titan.core.TitanGraph; -import org.apache.atlas.ApplicationProperties; -import org.apache.atlas.AtlasException; -import org.apache.atlas.GraphTransaction; -import org.apache.atlas.typesystem.exception.EntityNotFoundException; -import org.apache.atlas.utils.ParamChecker; -import org.apache.atlas.discovery.graph.DefaultGraphPersistenceStrategy; -import org.apache.atlas.discovery.graph.GraphBackedDiscoveryService; -import org.apache.atlas.query.Expressions; -import org.apache.atlas.query.GremlinQueryResult; -import org.apache.atlas.query.HiveLineageQuery; -import org.apache.atlas.query.HiveWhereUsedQuery; -import org.apache.atlas.repository.MetadataRepository; -import org.apache.atlas.repository.graph.GraphProvider; -import org.apache.atlas.typesystem.persistence.ReferenceableInstance; -import org.apache.commons.configuration.Configuration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.Option; -import scala.Some; -import scala.collection.immutable.List; - -import javax.inject.Inject; -import javax.inject.Singleton; - -/** - * Hive implementation of Lineage service interface. - */ -@Singleton -public class HiveLineageService implements LineageService { - - private static final Logger LOG = LoggerFactory.getLogger(HiveLineageService.class); - - private static final Option<List<String>> SELECT_ATTRIBUTES = - Some.<List<String>>apply(List.<String>fromArray(new String[]{"name"})); - - public static final String HIVE_TABLE_SCHEMA_QUERY_PREFIX = "atlas.lineage.hive.table.schema.query."; - - private static final String HIVE_TABLE_TYPE_NAME; - private static final String HIVE_PROCESS_TYPE_NAME; - private static final String HIVE_PROCESS_INPUT_ATTRIBUTE_NAME; - private static final String HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME; - - private static final String HIVE_TABLE_EXISTS_QUERY; - - private static final Configuration propertiesConf; - - static { - // todo - externalize this using type system - dog food - try { - propertiesConf = ApplicationProperties.get(); - HIVE_TABLE_TYPE_NAME = propertiesConf.getString("atlas.lineage.hive.table.type.name", "DataSet"); - HIVE_PROCESS_TYPE_NAME = propertiesConf.getString("atlas.lineage.hive.process.type.name", "Process"); - HIVE_PROCESS_INPUT_ATTRIBUTE_NAME = propertiesConf.getString("atlas.lineage.hive.process.inputs.name", "inputs"); - HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME = propertiesConf.getString("atlas.lineage.hive.process.outputs.name", "outputs"); - - HIVE_TABLE_EXISTS_QUERY = propertiesConf.getString("atlas.lineage.hive.table.exists.query", - "from " + HIVE_TABLE_TYPE_NAME + " where name=\"%s\""); - } catch (AtlasException e) { - throw new RuntimeException(e); - } - } - - - private final TitanGraph titanGraph; - private final DefaultGraphPersistenceStrategy graphPersistenceStrategy; - private final GraphBackedDiscoveryService discoveryService; - - @Inject - HiveLineageService(GraphProvider<TitanGraph> graphProvider, MetadataRepository metadataRepository, - GraphBackedDiscoveryService discoveryService) throws DiscoveryException { - this.titanGraph = graphProvider.get(); - this.graphPersistenceStrategy = new DefaultGraphPersistenceStrategy(metadataRepository); - this.discoveryService = discoveryService; - } - - /** - * Return the lineage outputs for the given tableName. - * - * @param tableName tableName - * @return Lineage Outputs as JSON - */ - @Override - @GraphTransaction - public String getOutputs(String tableName) throws AtlasException { - LOG.info("Fetching lineage outputs for tableName={}", tableName); - ParamChecker.notEmpty(tableName, "table name cannot be null"); - validateTableExists(tableName); - - HiveWhereUsedQuery outputsQuery = - new HiveWhereUsedQuery(HIVE_TABLE_TYPE_NAME, tableName, HIVE_PROCESS_TYPE_NAME, - HIVE_PROCESS_INPUT_ATTRIBUTE_NAME, HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME, Option.empty(), - SELECT_ATTRIBUTES, true, graphPersistenceStrategy, titanGraph); - - Expressions.Expression expression = outputsQuery.expr(); - LOG.debug("Expression is [" + expression.toString() + "]"); - try { - return discoveryService.evaluate(expression).toJson(); - } catch (Exception e) { // unable to catch ExpressionException - throw new DiscoveryException("Invalid expression [" + expression.toString() + "]", e); - } - } - - /** - * Return the lineage outputs graph for the given tableName. - * - * @param tableName tableName - * @return Outputs Graph as JSON - */ - @Override - @GraphTransaction - public String getOutputsGraph(String tableName) throws AtlasException { - LOG.info("Fetching lineage outputs graph for tableName={}", tableName); - ParamChecker.notEmpty(tableName, "table name cannot be null"); - validateTableExists(tableName); - - HiveWhereUsedQuery outputsQuery = - new HiveWhereUsedQuery(HIVE_TABLE_TYPE_NAME, tableName, HIVE_PROCESS_TYPE_NAME, - HIVE_PROCESS_INPUT_ATTRIBUTE_NAME, HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME, Option.empty(), - SELECT_ATTRIBUTES, true, graphPersistenceStrategy, titanGraph); - return outputsQuery.graph().toInstanceJson(); - } - - /** - * Return the lineage inputs for the given tableName. - * - * @param tableName tableName - * @return Lineage Inputs as JSON - */ - @Override - @GraphTransaction - public String getInputs(String tableName) throws AtlasException { - LOG.info("Fetching lineage inputs for tableName={}", tableName); - ParamChecker.notEmpty(tableName, "table name cannot be null"); - validateTableExists(tableName); - - HiveLineageQuery inputsQuery = new HiveLineageQuery(HIVE_TABLE_TYPE_NAME, tableName, HIVE_PROCESS_TYPE_NAME, - HIVE_PROCESS_INPUT_ATTRIBUTE_NAME, HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME, Option.empty(), - SELECT_ATTRIBUTES, true, graphPersistenceStrategy, titanGraph); - - Expressions.Expression expression = inputsQuery.expr(); - LOG.debug("Expression is [" + expression.toString() + "]"); - try { - return discoveryService.evaluate(expression).toJson(); - } catch (Exception e) { // unable to catch ExpressionException - throw new DiscoveryException("Invalid expression [" + expression.toString() + "]", e); - } - } - - /** - * Return the lineage inputs graph for the given tableName. - * - * @param tableName tableName - * @return Inputs Graph as JSON - */ - @Override - @GraphTransaction - public String getInputsGraph(String tableName) throws AtlasException { - LOG.info("Fetching lineage inputs graph for tableName={}", tableName); - ParamChecker.notEmpty(tableName, "table name cannot be null"); - validateTableExists(tableName); - - HiveLineageQuery inputsQuery = new HiveLineageQuery(HIVE_TABLE_TYPE_NAME, tableName, HIVE_PROCESS_TYPE_NAME, - HIVE_PROCESS_INPUT_ATTRIBUTE_NAME, HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME, Option.empty(), - SELECT_ATTRIBUTES, true, graphPersistenceStrategy, titanGraph); - return inputsQuery.graph().toInstanceJson(); - } - - /** - * Return the schema for the given tableName. - * - * @param tableName tableName - * @return Schema as JSON - */ - @Override - @GraphTransaction - public String getSchema(String tableName) throws AtlasException { - LOG.info("Fetching schema for tableName={}", tableName); - ParamChecker.notEmpty(tableName, "table name cannot be null"); - String typeName = validateTableExists(tableName); - - final String schemaQuery = - String.format(propertiesConf.getString(HIVE_TABLE_SCHEMA_QUERY_PREFIX + typeName), tableName); - return discoveryService.searchByDSL(schemaQuery); - } - - /** - * Validate if indeed this is a table type and exists. - * - * @param tableName table name - */ - private String validateTableExists(String tableName) throws AtlasException { - final String tableExistsQuery = String.format(HIVE_TABLE_EXISTS_QUERY, tableName); - GremlinQueryResult queryResult = discoveryService.evaluate(tableExistsQuery); - if (!(queryResult.rows().length() > 0)) { - throw new EntityNotFoundException(tableName + " does not exist"); - } - - ReferenceableInstance referenceable = (ReferenceableInstance)queryResult.rows().apply(0); - return referenceable.getTypeName(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/repository/src/main/scala/org/apache/atlas/query/ClosureQuery.scala ---------------------------------------------------------------------- diff --git a/repository/src/main/scala/org/apache/atlas/query/ClosureQuery.scala b/repository/src/main/scala/org/apache/atlas/query/ClosureQuery.scala index 05dc6a4..c4621cd 100755 --- a/repository/src/main/scala/org/apache/atlas/query/ClosureQuery.scala +++ b/repository/src/main/scala/org/apache/atlas/query/ClosureQuery.scala @@ -256,21 +256,21 @@ trait SingleInstanceClosureQuery[T] extends ClosureQuery { * @param persistenceStrategy as needed to evaluate the Closure Query. * @param g as needed to evaluate the Closure Query. */ -case class HiveLineageQuery(tableTypeName : String, - tableName : String, - ctasTypeName : String, - ctasInputTableAttribute : String, - ctasOutputTableAttribute : String, - depth : Option[Int], - selectAttributes : Option[List[String]], - withPath : Boolean, - persistenceStrategy: GraphPersistenceStrategies, - g: TitanGraph +case class InputLineageClosureQuery(tableTypeName : String, + attributeToSelectInstance : String, + tableName : String, + ctasTypeName : String, + ctasInputTableAttribute : String, + ctasOutputTableAttribute : String, + depth : Option[Int], + selectAttributes : Option[List[String]], + withPath : Boolean, + persistenceStrategy: GraphPersistenceStrategies, + g: TitanGraph ) extends SingleInstanceClosureQuery[String] { val closureType : String = tableTypeName - val attributeToSelectInstance = "name" val attributeTyp = DataTypes.STRING_TYPE val instanceValue = tableName @@ -296,21 +296,21 @@ case class HiveLineageQuery(tableTypeName : String, * @param persistenceStrategy as needed to evaluate the Closure Query. * @param g as needed to evaluate the Closure Query. */ -case class HiveWhereUsedQuery(tableTypeName : String, - tableName : String, - ctasTypeName : String, - ctasInputTableAttribute : String, - ctasOutputTableAttribute : String, - depth : Option[Int], - selectAttributes : Option[List[String]], - withPath : Boolean, - persistenceStrategy: GraphPersistenceStrategies, - g: TitanGraph +case class OutputLineageClosureQuery(tableTypeName : String, + attributeToSelectInstance : String, + tableName : String, + ctasTypeName : String, + ctasInputTableAttribute : String, + ctasOutputTableAttribute : String, + depth : Option[Int], + selectAttributes : Option[List[String]], + withPath : Boolean, + persistenceStrategy: GraphPersistenceStrategies, + g: TitanGraph ) extends SingleInstanceClosureQuery[String] { val closureType : String = tableTypeName - val attributeToSelectInstance = "name" val attributeTyp = DataTypes.STRING_TYPE val instanceValue = tableName http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/repository/src/test/java/org/apache/atlas/BaseHiveRepositoryTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/BaseHiveRepositoryTest.java b/repository/src/test/java/org/apache/atlas/BaseHiveRepositoryTest.java deleted file mode 100644 index 40f0d91..0000000 --- a/repository/src/test/java/org/apache/atlas/BaseHiveRepositoryTest.java +++ /dev/null @@ -1,377 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.atlas; - -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import com.thinkaurelius.titan.core.TitanGraph; -import com.thinkaurelius.titan.core.util.TitanCleanup; - -import org.apache.atlas.repository.MetadataRepository; -import org.apache.atlas.repository.graph.GraphBackedSearchIndexer; -import org.apache.atlas.repository.graph.GraphProvider; -import org.apache.atlas.services.MetadataService; -import org.apache.atlas.typesystem.ITypedReferenceableInstance; -import org.apache.atlas.typesystem.Referenceable; -import org.apache.atlas.typesystem.TypesDef; -import org.apache.atlas.typesystem.json.TypesSerialization; -import org.apache.atlas.typesystem.persistence.Id; -import org.apache.atlas.typesystem.types.AttributeDefinition; -import org.apache.atlas.typesystem.types.ClassType; -import org.apache.atlas.typesystem.types.DataTypes; -import org.apache.atlas.typesystem.types.EnumTypeDefinition; -import org.apache.atlas.typesystem.types.HierarchicalTypeDefinition; -import org.apache.atlas.typesystem.types.IDataType; -import org.apache.atlas.typesystem.types.Multiplicity; -import org.apache.atlas.typesystem.types.StructTypeDefinition; -import org.apache.atlas.typesystem.types.TraitType; -import org.apache.atlas.typesystem.types.TypeSystem; -import org.apache.atlas.typesystem.types.utils.TypesUtil; -import org.testng.annotations.Guice; - -import javax.inject.Inject; - -import java.util.ArrayList; -import java.util.Date; -import java.util.List; - -/** - * Base Class to set up hive types and instances for tests - */ -@Guice(modules = RepositoryMetadataModule.class) -public class BaseHiveRepositoryTest { - - @Inject - protected MetadataService metadataService; - - @Inject - protected MetadataRepository repository; - - @Inject - protected GraphProvider<TitanGraph> graphProvider; - - protected void setUp() throws Exception { - setUpTypes(); - new GraphBackedSearchIndexer(graphProvider); - RequestContext.createContext(); - setupInstances(); - TestUtils.dumpGraph(graphProvider.get()); - } - - protected void tearDown() throws Exception { - TypeSystem.getInstance().reset(); - try { - graphProvider.get().shutdown(); - } catch (Exception e) { - e.printStackTrace(); - } - try { - TitanCleanup.clear(graphProvider.get()); - } catch (Exception e) { - e.printStackTrace(); - } - } - - private void setUpTypes() throws Exception { - TypesDef typesDef = createTypeDefinitions(); - String typesAsJSON = TypesSerialization.toJson(typesDef); - metadataService.createType(typesAsJSON); - } - - private static final String DATABASE_TYPE = "hive_db"; - private static final String HIVE_TABLE_TYPE = "hive_table"; - private static final String COLUMN_TYPE = "hive_column"; - private static final String HIVE_PROCESS_TYPE = "hive_process"; - private static final String STORAGE_DESC_TYPE = "StorageDesc"; - private static final String VIEW_TYPE = "View"; - private static final String PARTITION_TYPE = "hive_partition"; - - TypesDef createTypeDefinitions() { - HierarchicalTypeDefinition<ClassType> dbClsDef = TypesUtil - .createClassTypeDef(DATABASE_TYPE, null, attrDef("name", DataTypes.STRING_TYPE), - attrDef("description", DataTypes.STRING_TYPE), attrDef("locationUri", DataTypes.STRING_TYPE), - attrDef("owner", DataTypes.STRING_TYPE), attrDef("createTime", DataTypes.LONG_TYPE)); - - HierarchicalTypeDefinition<ClassType> columnClsDef = TypesUtil - .createClassTypeDef(COLUMN_TYPE, null, attrDef("name", DataTypes.STRING_TYPE), - attrDef("dataType", DataTypes.STRING_TYPE), attrDef("comment", DataTypes.STRING_TYPE)); - - HierarchicalTypeDefinition<ClassType> storageDescClsDef = TypesUtil - .createClassTypeDef(STORAGE_DESC_TYPE, null, - attrDef("location", DataTypes.STRING_TYPE), - attrDef("inputFormat", DataTypes.STRING_TYPE), attrDef("outputFormat", DataTypes.STRING_TYPE), - attrDef("compressed", DataTypes.STRING_TYPE, Multiplicity.REQUIRED, false, null)); - - - HierarchicalTypeDefinition<ClassType> tblClsDef = TypesUtil - .createClassTypeDef(HIVE_TABLE_TYPE, ImmutableSet.of("DataSet"), - attrDef("owner", DataTypes.STRING_TYPE), - attrDef("createTime", DataTypes.DATE_TYPE), - attrDef("lastAccessTime", DataTypes.LONG_TYPE), attrDef("tableType", DataTypes.STRING_TYPE), - attrDef("temporary", DataTypes.BOOLEAN_TYPE), - new AttributeDefinition("db", DATABASE_TYPE, Multiplicity.REQUIRED, false, null), - // todo - uncomment this, something is broken - new AttributeDefinition("sd", STORAGE_DESC_TYPE, - Multiplicity.REQUIRED, true, null), - new AttributeDefinition("columns", DataTypes.arrayTypeName(COLUMN_TYPE), - Multiplicity.COLLECTION, true, null)); - - HierarchicalTypeDefinition<ClassType> loadProcessClsDef = TypesUtil - .createClassTypeDef(HIVE_PROCESS_TYPE, ImmutableSet.of("Process"), - attrDef("userName", DataTypes.STRING_TYPE), attrDef("startTime", DataTypes.LONG_TYPE), - attrDef("endTime", DataTypes.LONG_TYPE), - attrDef("queryText", DataTypes.STRING_TYPE, Multiplicity.REQUIRED), - attrDef("queryPlan", DataTypes.STRING_TYPE, Multiplicity.REQUIRED), - attrDef("queryId", DataTypes.STRING_TYPE, Multiplicity.REQUIRED), - attrDef("queryGraph", DataTypes.STRING_TYPE, Multiplicity.REQUIRED)); - - HierarchicalTypeDefinition<ClassType> viewClsDef = TypesUtil - .createClassTypeDef(VIEW_TYPE, null, attrDef("name", DataTypes.STRING_TYPE), - new AttributeDefinition("db", DATABASE_TYPE, Multiplicity.REQUIRED, false, null), - new AttributeDefinition("inputTables", DataTypes.arrayTypeName(HIVE_TABLE_TYPE), - Multiplicity.COLLECTION, false, null)); - - AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ - new AttributeDefinition("values", DataTypes.arrayTypeName(DataTypes.STRING_TYPE.getName()), - Multiplicity.OPTIONAL, false, null), - new AttributeDefinition("table", HIVE_TABLE_TYPE, Multiplicity.REQUIRED, false, null), - }; - HierarchicalTypeDefinition<ClassType> partClsDef = - new HierarchicalTypeDefinition<>(ClassType.class, PARTITION_TYPE, null, null, - attributeDefinitions); - - HierarchicalTypeDefinition<TraitType> dimTraitDef = TypesUtil.createTraitTypeDef("Dimension", null); - - HierarchicalTypeDefinition<TraitType> factTraitDef = TypesUtil.createTraitTypeDef("Fact", null); - - HierarchicalTypeDefinition<TraitType> metricTraitDef = TypesUtil.createTraitTypeDef("Metric", null); - - HierarchicalTypeDefinition<TraitType> etlTraitDef = TypesUtil.createTraitTypeDef("ETL", null); - - HierarchicalTypeDefinition<TraitType> piiTraitDef = TypesUtil.createTraitTypeDef("PII", null); - - HierarchicalTypeDefinition<TraitType> jdbcTraitDef = TypesUtil.createTraitTypeDef("JdbcAccess", null); - - HierarchicalTypeDefinition<TraitType> logTraitDef = TypesUtil.createTraitTypeDef("Log Data", null); - - return TypesUtil.getTypesDef(ImmutableList.<EnumTypeDefinition>of(), ImmutableList.<StructTypeDefinition>of(), - ImmutableList.of(dimTraitDef, factTraitDef, piiTraitDef, metricTraitDef, etlTraitDef, jdbcTraitDef, logTraitDef), - ImmutableList.of(dbClsDef, storageDescClsDef, columnClsDef, tblClsDef, loadProcessClsDef, viewClsDef, partClsDef)); - } - - AttributeDefinition attrDef(String name, IDataType dT) { - return attrDef(name, dT, Multiplicity.OPTIONAL, false, null); - } - - AttributeDefinition attrDef(String name, IDataType dT, Multiplicity m) { - return attrDef(name, dT, m, false, null); - } - - AttributeDefinition attrDef(String name, IDataType dT, Multiplicity m, boolean isComposite, - String reverseAttributeName) { - Preconditions.checkNotNull(name); - Preconditions.checkNotNull(dT); - return new AttributeDefinition(name, dT.getName(), m, isComposite, reverseAttributeName); - } - - private void setupInstances() throws Exception { - Id salesDB = database("Sales", "Sales Database", "John ETL", "hdfs://host:8000/apps/warehouse/sales"); - - Referenceable sd = - storageDescriptor("hdfs://host:8000/apps/warehouse/sales", "TextInputFormat", "TextOutputFormat", true, ImmutableList.of( - column("time_id", "int", "time id"))); - - List<Referenceable> salesFactColumns = ImmutableList - .of(column("time_id", "int", "time id"), - column("product_id", "int", "product id"), - column("customer_id", "int", "customer id", "PII"), - column("sales", "double", "product id", "Metric")); - - Id salesFact = table("sales_fact", "sales fact table", salesDB, sd, "Joe", "Managed", salesFactColumns, "Fact"); - - List<Referenceable> logFactColumns = ImmutableList - .of(column("time_id", "int", "time id"), column("app_id", "int", "app id"), - column("machine_id", "int", "machine id"), column("log", "string", "log data", "Log Data")); - - List<Referenceable> timeDimColumns = ImmutableList - .of(column("time_id", "int", "time id"), - column("dayOfYear", "int", "day Of Year"), - column("weekDay", "int", "week Day")); - - Id timeDim = table("time_dim", "time dimension table", salesDB, sd, "John Doe", "External", timeDimColumns, - "Dimension"); - - Id reportingDB = - database("Reporting", "reporting database", "Jane BI", "hdfs://host:8000/apps/warehouse/reporting"); - - Id salesFactDaily = - table("sales_fact_daily_mv", "sales fact daily materialized view", reportingDB, sd, "Joe BI", "Managed", - salesFactColumns, "Metric"); - - loadProcess("loadSalesDaily", "hive query for daily summary", "John ETL", ImmutableList.of(salesFact, timeDim), - ImmutableList.of(salesFactDaily), "create table as select ", "plan", "id", "graph", "ETL"); - - Id logDB = database("Logging", "logging database", "Tim ETL", "hdfs://host:8000/apps/warehouse/logging"); - - Id loggingFactDaily = - table("log_fact_daily_mv", "log fact daily materialized view", logDB, sd, "Tim ETL", "Managed", - logFactColumns, "Log Data"); - - List<Referenceable> productDimColumns = ImmutableList - .of(column("product_id", "int", "product id"), - column("product_name", "string", "product name"), - column("brand_name", "int", "brand name")); - - Id productDim = - table("product_dim", "product dimension table", salesDB, sd, "John Doe", "Managed", productDimColumns, - "Dimension"); - - view("product_dim_view", reportingDB, ImmutableList.of(productDim), "Dimension", "JdbcAccess"); - - List<Referenceable> customerDimColumns = ImmutableList.of( - column("customer_id", "int", "customer id", "PII"), - column("name", "string", "customer name", "PII"), - column("address", "string", "customer address", "PII")); - - Id customerDim = - table("customer_dim", "customer dimension table", salesDB, sd, "fetl", "External", customerDimColumns, - "Dimension"); - - view("customer_dim_view", reportingDB, ImmutableList.of(customerDim), "Dimension", "JdbcAccess"); - - Id salesFactMonthly = - table("sales_fact_monthly_mv", "sales fact monthly materialized view", reportingDB, sd, "Jane BI", - "Managed", salesFactColumns, "Metric"); - - loadProcess("loadSalesMonthly", "hive query for monthly summary", "John ETL", ImmutableList.of(salesFactDaily), - ImmutableList.of(salesFactMonthly), "create table as select ", "plan", "id", "graph", "ETL"); - - Id loggingFactMonthly = - table("logging_fact_monthly_mv", "logging fact monthly materialized view", logDB, sd, "Tim ETL", - "Managed", logFactColumns, "Log Data"); - - loadProcess("loadLogsMonthly", "hive query for monthly summary", "Tim ETL", ImmutableList.of(loggingFactDaily), - ImmutableList.of(loggingFactMonthly), "create table as select ", "plan", "id", "graph", "ETL"); - - partition(new ArrayList() {{ add("2015-01-01"); }}, salesFactDaily); - } - - Id database(String name, String description, String owner, String locationUri, String... traitNames) - throws Exception { - Referenceable referenceable = new Referenceable(DATABASE_TYPE, traitNames); - referenceable.set("name", name); - referenceable.set("description", description); - referenceable.set("owner", owner); - referenceable.set("locationUri", locationUri); - referenceable.set("createTime", System.currentTimeMillis()); - - ClassType clsType = TypeSystem.getInstance().getDataType(ClassType.class, DATABASE_TYPE); - return createInstance(referenceable, clsType); - } - - Referenceable storageDescriptor(String location, String inputFormat, String outputFormat, boolean compressed, List<Referenceable> columns) - throws Exception { - Referenceable referenceable = new Referenceable(STORAGE_DESC_TYPE); - referenceable.set("location", location); - referenceable.set("inputFormat", inputFormat); - referenceable.set("outputFormat", outputFormat); - referenceable.set("compressed", compressed); - referenceable.set("cols", columns); - - return referenceable; - } - - Referenceable column(String name, String dataType, String comment, String... traitNames) throws Exception { - Referenceable referenceable = new Referenceable(COLUMN_TYPE, traitNames); - referenceable.set("name", name); - referenceable.set("dataType", dataType); - referenceable.set("comment", comment); - - return referenceable; - } - - Id table(String name, String description, Id dbId, Referenceable sd, String owner, String tableType, - List<Referenceable> columns, String... traitNames) throws Exception { - Referenceable referenceable = new Referenceable(HIVE_TABLE_TYPE, traitNames); - referenceable.set("name", name); - referenceable.set("description", description); - referenceable.set("owner", owner); - referenceable.set("tableType", tableType); - referenceable.set("temporary", false); - referenceable.set("createTime", new Date(System.currentTimeMillis())); - referenceable.set("lastAccessTime", System.currentTimeMillis()); - referenceable.set("retention", System.currentTimeMillis()); - - referenceable.set("db", dbId); - // todo - uncomment this, something is broken - referenceable.set("sd", sd); - referenceable.set("columns", columns); - - ClassType clsType = TypeSystem.getInstance().getDataType(ClassType.class, HIVE_TABLE_TYPE); - return createInstance(referenceable, clsType); - } - - Id loadProcess(String name, String description, String user, List<Id> inputTables, List<Id> outputTables, - String queryText, String queryPlan, String queryId, String queryGraph, String... traitNames) - throws Exception { - Referenceable referenceable = new Referenceable(HIVE_PROCESS_TYPE, traitNames); - referenceable.set(AtlasClient.NAME, name); - referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name); - referenceable.set("description", description); - referenceable.set("user", user); - referenceable.set("startTime", System.currentTimeMillis()); - referenceable.set("endTime", System.currentTimeMillis() + 10000); - - referenceable.set("inputs", inputTables); - referenceable.set("outputs", outputTables); - - referenceable.set("queryText", queryText); - referenceable.set("queryPlan", queryPlan); - referenceable.set("queryId", queryId); - referenceable.set("queryGraph", queryGraph); - - ClassType clsType = TypeSystem.getInstance().getDataType(ClassType.class, HIVE_PROCESS_TYPE); - return createInstance(referenceable, clsType); - } - - Id view(String name, Id dbId, List<Id> inputTables, String... traitNames) throws Exception { - Referenceable referenceable = new Referenceable(VIEW_TYPE, traitNames); - referenceable.set("name", name); - referenceable.set("db", dbId); - - referenceable.set("inputTables", inputTables); - ClassType clsType = TypeSystem.getInstance().getDataType(ClassType.class, VIEW_TYPE); - return createInstance(referenceable, clsType); - } - - Id partition(List<String> values, Id table, String... traitNames) throws Exception { - Referenceable referenceable = new Referenceable(PARTITION_TYPE, traitNames); - referenceable.set("values", values); - referenceable.set("table", table); - ClassType clsType = TypeSystem.getInstance().getDataType(ClassType.class, PARTITION_TYPE); - return createInstance(referenceable, clsType); - } - private Id createInstance(Referenceable referenceable, ClassType clsType) throws Exception { - ITypedReferenceableInstance typedInstance = clsType.convert(referenceable, Multiplicity.REQUIRED); - List<String> guids = repository.createEntities(typedInstance); - - // return the reference to created instance with guid - return new Id(guids.get(guids.size() - 1), 0, referenceable.getTypeName()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/repository/src/test/java/org/apache/atlas/BaseRepositoryTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/BaseRepositoryTest.java b/repository/src/test/java/org/apache/atlas/BaseRepositoryTest.java new file mode 100644 index 0000000..d1f9430 --- /dev/null +++ b/repository/src/test/java/org/apache/atlas/BaseRepositoryTest.java @@ -0,0 +1,377 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.thinkaurelius.titan.core.TitanGraph; +import com.thinkaurelius.titan.core.util.TitanCleanup; + +import org.apache.atlas.repository.MetadataRepository; +import org.apache.atlas.repository.graph.GraphBackedSearchIndexer; +import org.apache.atlas.repository.graph.GraphProvider; +import org.apache.atlas.services.MetadataService; +import org.apache.atlas.typesystem.ITypedReferenceableInstance; +import org.apache.atlas.typesystem.Referenceable; +import org.apache.atlas.typesystem.TypesDef; +import org.apache.atlas.typesystem.json.TypesSerialization; +import org.apache.atlas.typesystem.persistence.Id; +import org.apache.atlas.typesystem.types.AttributeDefinition; +import org.apache.atlas.typesystem.types.ClassType; +import org.apache.atlas.typesystem.types.DataTypes; +import org.apache.atlas.typesystem.types.EnumTypeDefinition; +import org.apache.atlas.typesystem.types.HierarchicalTypeDefinition; +import org.apache.atlas.typesystem.types.IDataType; +import org.apache.atlas.typesystem.types.Multiplicity; +import org.apache.atlas.typesystem.types.StructTypeDefinition; +import org.apache.atlas.typesystem.types.TraitType; +import org.apache.atlas.typesystem.types.TypeSystem; +import org.apache.atlas.typesystem.types.utils.TypesUtil; +import org.testng.annotations.Guice; + +import javax.inject.Inject; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +/** + * Base Class to set up hive types and instances for tests + */ +@Guice(modules = RepositoryMetadataModule.class) +public class BaseRepositoryTest { + + @Inject + protected MetadataService metadataService; + + @Inject + protected MetadataRepository repository; + + @Inject + protected GraphProvider<TitanGraph> graphProvider; + + protected void setUp() throws Exception { + setUpTypes(); + new GraphBackedSearchIndexer(graphProvider); + RequestContext.createContext(); + setupInstances(); + TestUtils.dumpGraph(graphProvider.get()); + } + + protected void tearDown() throws Exception { + TypeSystem.getInstance().reset(); + try { + graphProvider.get().shutdown(); + } catch (Exception e) { + e.printStackTrace(); + } + try { + TitanCleanup.clear(graphProvider.get()); + } catch (Exception e) { + e.printStackTrace(); + } + } + + private void setUpTypes() throws Exception { + TypesDef typesDef = createTypeDefinitions(); + String typesAsJSON = TypesSerialization.toJson(typesDef); + metadataService.createType(typesAsJSON); + } + + protected static final String DATABASE_TYPE = "hive_db"; + protected static final String HIVE_TABLE_TYPE = "hive_table"; + private static final String COLUMN_TYPE = "hive_column"; + private static final String HIVE_PROCESS_TYPE = "hive_process"; + private static final String STORAGE_DESC_TYPE = "StorageDesc"; + private static final String VIEW_TYPE = "View"; + private static final String PARTITION_TYPE = "hive_partition"; + + TypesDef createTypeDefinitions() { + HierarchicalTypeDefinition<ClassType> dbClsDef = TypesUtil + .createClassTypeDef(DATABASE_TYPE, null, + TypesUtil.createUniqueRequiredAttrDef("name", DataTypes.STRING_TYPE), + attrDef("description", DataTypes.STRING_TYPE), attrDef("locationUri", DataTypes.STRING_TYPE), + attrDef("owner", DataTypes.STRING_TYPE), attrDef("createTime", DataTypes.LONG_TYPE)); + + HierarchicalTypeDefinition<ClassType> columnClsDef = TypesUtil + .createClassTypeDef(COLUMN_TYPE, null, attrDef("name", DataTypes.STRING_TYPE), + attrDef("dataType", DataTypes.STRING_TYPE), attrDef("comment", DataTypes.STRING_TYPE)); + + HierarchicalTypeDefinition<ClassType> storageDescClsDef = TypesUtil + .createClassTypeDef(STORAGE_DESC_TYPE, null, + attrDef("location", DataTypes.STRING_TYPE), + attrDef("inputFormat", DataTypes.STRING_TYPE), attrDef("outputFormat", DataTypes.STRING_TYPE), + attrDef("compressed", DataTypes.STRING_TYPE, Multiplicity.REQUIRED, false, null)); + + + HierarchicalTypeDefinition<ClassType> tblClsDef = TypesUtil + .createClassTypeDef(HIVE_TABLE_TYPE, ImmutableSet.of("DataSet"), + attrDef("owner", DataTypes.STRING_TYPE), + attrDef("createTime", DataTypes.DATE_TYPE), + attrDef("lastAccessTime", DataTypes.LONG_TYPE), attrDef("tableType", DataTypes.STRING_TYPE), + attrDef("temporary", DataTypes.BOOLEAN_TYPE), + new AttributeDefinition("db", DATABASE_TYPE, Multiplicity.REQUIRED, false, null), + // todo - uncomment this, something is broken + new AttributeDefinition("sd", STORAGE_DESC_TYPE, Multiplicity.REQUIRED, true, null), + new AttributeDefinition("columns", DataTypes.arrayTypeName(COLUMN_TYPE), + Multiplicity.COLLECTION, true, null)); + + HierarchicalTypeDefinition<ClassType> loadProcessClsDef = TypesUtil + .createClassTypeDef(HIVE_PROCESS_TYPE, ImmutableSet.of("Process"), + attrDef("userName", DataTypes.STRING_TYPE), attrDef("startTime", DataTypes.LONG_TYPE), + attrDef("endTime", DataTypes.LONG_TYPE), + attrDef("queryText", DataTypes.STRING_TYPE, Multiplicity.REQUIRED), + attrDef("queryPlan", DataTypes.STRING_TYPE, Multiplicity.REQUIRED), + attrDef("queryId", DataTypes.STRING_TYPE, Multiplicity.REQUIRED), + attrDef("queryGraph", DataTypes.STRING_TYPE, Multiplicity.REQUIRED)); + + HierarchicalTypeDefinition<ClassType> viewClsDef = TypesUtil + .createClassTypeDef(VIEW_TYPE, null, attrDef("name", DataTypes.STRING_TYPE), + new AttributeDefinition("db", DATABASE_TYPE, Multiplicity.REQUIRED, false, null), + new AttributeDefinition("inputTables", DataTypes.arrayTypeName(HIVE_TABLE_TYPE), + Multiplicity.COLLECTION, false, null)); + + AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ + new AttributeDefinition("values", DataTypes.arrayTypeName(DataTypes.STRING_TYPE.getName()), + Multiplicity.OPTIONAL, false, null), + new AttributeDefinition("table", HIVE_TABLE_TYPE, Multiplicity.REQUIRED, false, null), + }; + HierarchicalTypeDefinition<ClassType> partClsDef = + new HierarchicalTypeDefinition<>(ClassType.class, PARTITION_TYPE, null, null, + attributeDefinitions); + + HierarchicalTypeDefinition<TraitType> dimTraitDef = TypesUtil.createTraitTypeDef("Dimension", null); + + HierarchicalTypeDefinition<TraitType> factTraitDef = TypesUtil.createTraitTypeDef("Fact", null); + + HierarchicalTypeDefinition<TraitType> metricTraitDef = TypesUtil.createTraitTypeDef("Metric", null); + + HierarchicalTypeDefinition<TraitType> etlTraitDef = TypesUtil.createTraitTypeDef("ETL", null); + + HierarchicalTypeDefinition<TraitType> piiTraitDef = TypesUtil.createTraitTypeDef("PII", null); + + HierarchicalTypeDefinition<TraitType> jdbcTraitDef = TypesUtil.createTraitTypeDef("JdbcAccess", null); + + HierarchicalTypeDefinition<TraitType> logTraitDef = TypesUtil.createTraitTypeDef("Log Data", null); + + return TypesUtil.getTypesDef(ImmutableList.<EnumTypeDefinition>of(), ImmutableList.<StructTypeDefinition>of(), + ImmutableList.of(dimTraitDef, factTraitDef, piiTraitDef, metricTraitDef, etlTraitDef, jdbcTraitDef, logTraitDef), + ImmutableList.of(dbClsDef, storageDescClsDef, columnClsDef, tblClsDef, loadProcessClsDef, viewClsDef, partClsDef)); + } + + AttributeDefinition attrDef(String name, IDataType dT) { + return attrDef(name, dT, Multiplicity.OPTIONAL, false, null); + } + + AttributeDefinition attrDef(String name, IDataType dT, Multiplicity m) { + return attrDef(name, dT, m, false, null); + } + + AttributeDefinition attrDef(String name, IDataType dT, Multiplicity m, boolean isComposite, + String reverseAttributeName) { + Preconditions.checkNotNull(name); + Preconditions.checkNotNull(dT); + return new AttributeDefinition(name, dT.getName(), m, isComposite, reverseAttributeName); + } + + private void setupInstances() throws Exception { + Id salesDB = database("Sales", "Sales Database", "John ETL", "hdfs://host:8000/apps/warehouse/sales"); + + Referenceable sd = + storageDescriptor("hdfs://host:8000/apps/warehouse/sales", "TextInputFormat", "TextOutputFormat", true, ImmutableList.of( + column("time_id", "int", "time id"))); + + List<Referenceable> salesFactColumns = ImmutableList + .of(column("time_id", "int", "time id"), + column("product_id", "int", "product id"), + column("customer_id", "int", "customer id", "PII"), + column("sales", "double", "product id", "Metric")); + + Id salesFact = table("sales_fact", "sales fact table", salesDB, sd, "Joe", "Managed", salesFactColumns, "Fact"); + + List<Referenceable> logFactColumns = ImmutableList + .of(column("time_id", "int", "time id"), column("app_id", "int", "app id"), + column("machine_id", "int", "machine id"), column("log", "string", "log data", "Log Data")); + + List<Referenceable> timeDimColumns = ImmutableList + .of(column("time_id", "int", "time id"), + column("dayOfYear", "int", "day Of Year"), + column("weekDay", "int", "week Day")); + + Id timeDim = table("time_dim", "time dimension table", salesDB, sd, "John Doe", "External", timeDimColumns, + "Dimension"); + + Id reportingDB = + database("Reporting", "reporting database", "Jane BI", "hdfs://host:8000/apps/warehouse/reporting"); + + Id salesFactDaily = + table("sales_fact_daily_mv", "sales fact daily materialized view", reportingDB, sd, "Joe BI", "Managed", + salesFactColumns, "Metric"); + + loadProcess("loadSalesDaily", "hive query for daily summary", "John ETL", ImmutableList.of(salesFact, timeDim), + ImmutableList.of(salesFactDaily), "create table as select ", "plan", "id", "graph", "ETL"); + + Id logDB = database("Logging", "logging database", "Tim ETL", "hdfs://host:8000/apps/warehouse/logging"); + + Id loggingFactDaily = + table("log_fact_daily_mv", "log fact daily materialized view", logDB, sd, "Tim ETL", "Managed", + logFactColumns, "Log Data"); + + List<Referenceable> productDimColumns = ImmutableList + .of(column("product_id", "int", "product id"), + column("product_name", "string", "product name"), + column("brand_name", "int", "brand name")); + + Id productDim = + table("product_dim", "product dimension table", salesDB, sd, "John Doe", "Managed", productDimColumns, + "Dimension"); + + view("product_dim_view", reportingDB, ImmutableList.of(productDim), "Dimension", "JdbcAccess"); + + List<Referenceable> customerDimColumns = ImmutableList.of( + column("customer_id", "int", "customer id", "PII"), + column("name", "string", "customer name", "PII"), + column("address", "string", "customer address", "PII")); + + Id customerDim = + table("customer_dim", "customer dimension table", salesDB, sd, "fetl", "External", customerDimColumns, + "Dimension"); + + view("customer_dim_view", reportingDB, ImmutableList.of(customerDim), "Dimension", "JdbcAccess"); + + Id salesFactMonthly = + table("sales_fact_monthly_mv", "sales fact monthly materialized view", reportingDB, sd, "Jane BI", + "Managed", salesFactColumns, "Metric"); + + loadProcess("loadSalesMonthly", "hive query for monthly summary", "John ETL", ImmutableList.of(salesFactDaily), + ImmutableList.of(salesFactMonthly), "create table as select ", "plan", "id", "graph", "ETL"); + + Id loggingFactMonthly = + table("logging_fact_monthly_mv", "logging fact monthly materialized view", logDB, sd, "Tim ETL", + "Managed", logFactColumns, "Log Data"); + + loadProcess("loadLogsMonthly", "hive query for monthly summary", "Tim ETL", ImmutableList.of(loggingFactDaily), + ImmutableList.of(loggingFactMonthly), "create table as select ", "plan", "id", "graph", "ETL"); + + partition(new ArrayList() {{ add("2015-01-01"); }}, salesFactDaily); + } + + Id database(String name, String description, String owner, String locationUri, String... traitNames) + throws Exception { + Referenceable referenceable = new Referenceable(DATABASE_TYPE, traitNames); + referenceable.set("name", name); + referenceable.set("description", description); + referenceable.set("owner", owner); + referenceable.set("locationUri", locationUri); + referenceable.set("createTime", System.currentTimeMillis()); + + ClassType clsType = TypeSystem.getInstance().getDataType(ClassType.class, DATABASE_TYPE); + return createInstance(referenceable, clsType); + } + + protected Referenceable storageDescriptor(String location, String inputFormat, String outputFormat, boolean compressed, List<Referenceable> columns) + throws Exception { + Referenceable referenceable = new Referenceable(STORAGE_DESC_TYPE); + referenceable.set("location", location); + referenceable.set("inputFormat", inputFormat); + referenceable.set("outputFormat", outputFormat); + referenceable.set("compressed", compressed); + referenceable.set("cols", columns); + + return referenceable; + } + + protected Referenceable column(String name, String dataType, String comment, String... traitNames) throws Exception { + Referenceable referenceable = new Referenceable(COLUMN_TYPE, traitNames); + referenceable.set("name", name); + referenceable.set("dataType", dataType); + referenceable.set("comment", comment); + + return referenceable; + } + + protected Id table(String name, String description, Id dbId, Referenceable sd, String owner, String tableType, + List<Referenceable> columns, String... traitNames) throws Exception { + Referenceable referenceable = new Referenceable(HIVE_TABLE_TYPE, traitNames); + referenceable.set("name", name); + referenceable.set("description", description); + referenceable.set("owner", owner); + referenceable.set("tableType", tableType); + referenceable.set("temporary", false); + referenceable.set("createTime", new Date(System.currentTimeMillis())); + referenceable.set("lastAccessTime", System.currentTimeMillis()); + referenceable.set("retention", System.currentTimeMillis()); + + referenceable.set("db", dbId); + // todo - uncomment this, something is broken + referenceable.set("sd", sd); + referenceable.set("columns", columns); + + ClassType clsType = TypeSystem.getInstance().getDataType(ClassType.class, HIVE_TABLE_TYPE); + return createInstance(referenceable, clsType); + } + + protected Id loadProcess(String name, String description, String user, List<Id> inputTables, List<Id> outputTables, + String queryText, String queryPlan, String queryId, String queryGraph, String... traitNames) + throws Exception { + Referenceable referenceable = new Referenceable(HIVE_PROCESS_TYPE, traitNames); + referenceable.set("name", name); + referenceable.set("qualifiedName", name); + referenceable.set("description", description); + referenceable.set("user", user); + referenceable.set("startTime", System.currentTimeMillis()); + referenceable.set("endTime", System.currentTimeMillis() + 10000); + + referenceable.set("inputs", inputTables); + referenceable.set("outputs", outputTables); + + referenceable.set("queryText", queryText); + referenceable.set("queryPlan", queryPlan); + referenceable.set("queryId", queryId); + referenceable.set("queryGraph", queryGraph); + + ClassType clsType = TypeSystem.getInstance().getDataType(ClassType.class, HIVE_PROCESS_TYPE); + return createInstance(referenceable, clsType); + } + + Id view(String name, Id dbId, List<Id> inputTables, String... traitNames) throws Exception { + Referenceable referenceable = new Referenceable(VIEW_TYPE, traitNames); + referenceable.set("name", name); + referenceable.set("db", dbId); + + referenceable.set("inputTables", inputTables); + ClassType clsType = TypeSystem.getInstance().getDataType(ClassType.class, VIEW_TYPE); + return createInstance(referenceable, clsType); + } + + Id partition(List<String> values, Id table, String... traitNames) throws Exception { + Referenceable referenceable = new Referenceable(PARTITION_TYPE, traitNames); + referenceable.set("values", values); + referenceable.set("table", table); + ClassType clsType = TypeSystem.getInstance().getDataType(ClassType.class, PARTITION_TYPE); + return createInstance(referenceable, clsType); + } + private Id createInstance(Referenceable referenceable, ClassType clsType) throws Exception { + ITypedReferenceableInstance typedInstance = clsType.convert(referenceable, Multiplicity.REQUIRED); + List<String> guids = repository.createEntities(typedInstance); + + // return the reference to created instance with guid + return new Id(guids.get(guids.size() - 1), 0, referenceable.getTypeName()); + } +}
