http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHive2JDBC.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHive2JDBC.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHive2JDBC.java new file mode 100644 index 0000000..9e1a92c --- /dev/null +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHive2JDBC.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.provenance.analyzer; + +import org.apache.atlas.typesystem.Referenceable; +import org.apache.nifi.atlas.provenance.AnalysisContext; +import org.apache.nifi.atlas.provenance.DataSetRefs; +import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer; +import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzerFactory; +import org.apache.nifi.atlas.resolver.ClusterResolvers; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME; +import static org.apache.nifi.atlas.provenance.analyzer.DatabaseAnalyzerUtil.ATTR_INPUT_TABLES; +import static org.apache.nifi.atlas.provenance.analyzer.DatabaseAnalyzerUtil.ATTR_OUTPUT_TABLES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.matches; +import static org.mockito.Mockito.when; + +public class TestHive2JDBC { + + /** + * If a provenance event does not have table name attributes, + * then a database lineage should be created. + */ + @Test + public void testDatabaseLineage() { + final String processorName = "PutHiveQL"; + final String transitUri = "jdbc:hive2://0.example.com:10000/databaseA"; + final ProvenanceEventRecord record = Mockito.mock(ProvenanceEventRecord.class); + when(record.getComponentType()).thenReturn(processorName); + when(record.getTransitUri()).thenReturn(transitUri); + when(record.getEventType()).thenReturn(ProvenanceEventType.SEND); + + final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class); + when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1"); + + final AnalysisContext context = Mockito.mock(AnalysisContext.class); + when(context.getClusterResolver()).thenReturn(clusterResolvers); + + final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, transitUri, record.getEventType()); + assertNotNull(analyzer); + + final DataSetRefs refs = analyzer.analyze(context, record); + assertEquals(0, refs.getInputs().size()); + assertEquals(1, refs.getOutputs().size()); + Referenceable ref = refs.getOutputs().iterator().next(); + assertEquals("hive_db", ref.getTypeName()); + assertEquals("databaseA", ref.get(ATTR_NAME)); + assertEquals("databaseA@cluster1", ref.get(ATTR_QUALIFIED_NAME)); + } + + /** + * If a provenance event has table name attributes, + * then table lineages can be created. + */ + @Test + public void testTableLineage() { + final String processorName = "PutHiveQL"; + final String transitUri = "jdbc:hive2://0.example.com:10000/databaseA"; + final ProvenanceEventRecord record = Mockito.mock(ProvenanceEventRecord.class); + when(record.getComponentType()).thenReturn(processorName); + when(record.getTransitUri()).thenReturn(transitUri); + when(record.getEventType()).thenReturn(ProvenanceEventType.SEND); + // E.g. insert into databaseB.tableB1 select something from tableA1 a1 inner join tableA2 a2 where a1.id = a2.id + when(record.getAttribute(ATTR_INPUT_TABLES)).thenReturn("tableA1, tableA2"); + when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("databaseB.tableB1"); + + final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class); + when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1"); + + final AnalysisContext context = Mockito.mock(AnalysisContext.class); + when(context.getClusterResolver()).thenReturn(clusterResolvers); + + final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, transitUri, record.getEventType()); + assertNotNull(analyzer); + + final DataSetRefs refs = analyzer.analyze(context, record); + assertEquals(2, refs.getInputs().size()); + // QualifiedName : Name + final Map<String, String> expectedInputRefs = new HashMap<>(); + expectedInputRefs.put("databaseA.tableA1@cluster1", "tableA1"); + expectedInputRefs.put("databaseA.tableA2@cluster1", "tableA2"); + for (Referenceable ref : refs.getInputs()) { + final String qName = (String) ref.get(ATTR_QUALIFIED_NAME); + assertTrue(expectedInputRefs.containsKey(qName)); + assertEquals(expectedInputRefs.get(qName), ref.get(ATTR_NAME)); + } + + assertEquals(1, refs.getOutputs().size()); + Referenceable ref = refs.getOutputs().iterator().next(); + assertEquals("hive_table", ref.getTypeName()); + assertEquals("tableB1", ref.get(ATTR_NAME)); + assertEquals("databaseB.tableB1@cluster1", ref.get(ATTR_QUALIFIED_NAME)); + } + + /** + * If a provenance event has table name attributes, then table lineages can be created. + * In this case, if its transit URI does not contain database name, use 'default'. + */ + @Test + public void testTableLineageWithDefaultTableName() { + final String processorName = "PutHiveQL"; + final String transitUri = "jdbc:hive2://0.example.com:10000"; + final ProvenanceEventRecord record = Mockito.mock(ProvenanceEventRecord.class); + when(record.getComponentType()).thenReturn(processorName); + when(record.getTransitUri()).thenReturn(transitUri); + when(record.getEventType()).thenReturn(ProvenanceEventType.SEND); + // E.g. insert into databaseB.tableB1 select something from tableA1 a1 inner join tableA2 a2 where a1.id = a2.id + when(record.getAttribute(ATTR_INPUT_TABLES)).thenReturn("tableA1, tableA2"); + when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("databaseB.tableB1"); + + final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class); + when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1"); + + final AnalysisContext context = Mockito.mock(AnalysisContext.class); + when(context.getClusterResolver()).thenReturn(clusterResolvers); + + final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, transitUri, record.getEventType()); + assertNotNull(analyzer); + + final DataSetRefs refs = analyzer.analyze(context, record); + assertEquals(2, refs.getInputs().size()); + // QualifiedName : Name + final Map<String, String> expectedInputRefs = new HashMap<>(); + expectedInputRefs.put("default.tableA1@cluster1", "tableA1"); + expectedInputRefs.put("default.tableA2@cluster1", "tableA2"); + for (Referenceable ref : refs.getInputs()) { + final String qName = (String) ref.get(ATTR_QUALIFIED_NAME); + assertTrue(expectedInputRefs.containsKey(qName)); + assertEquals(expectedInputRefs.get(qName), ref.get(ATTR_NAME)); + } + + assertEquals(1, refs.getOutputs().size()); + Referenceable ref = refs.getOutputs().iterator().next(); + assertEquals("hive_table", ref.getTypeName()); + assertEquals("tableB1", ref.get(ATTR_NAME)); + assertEquals("databaseB.tableB1@cluster1", ref.get(ATTR_QUALIFIED_NAME)); + } +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestKafkaTopic.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestKafkaTopic.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestKafkaTopic.java new file mode 100644 index 0000000..5c0fd0e --- /dev/null +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestKafkaTopic.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.provenance.analyzer; + +import org.apache.atlas.typesystem.Referenceable; +import org.apache.nifi.atlas.provenance.AnalysisContext; +import org.apache.nifi.atlas.provenance.DataSetRefs; +import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer; +import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzerFactory; +import org.apache.nifi.atlas.resolver.ClusterResolvers; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.junit.Test; +import org.mockito.Mockito; + +import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.Matchers.matches; +import static org.mockito.Mockito.when; + +public class TestKafkaTopic { + + @Test + public void testPublishKafka() { + final String processorName = "PublishKafka"; + final String transitUri = "PLAINTEXT://0.example.com:6667/topicA"; + final ProvenanceEventRecord record = Mockito.mock(ProvenanceEventRecord.class); + when(record.getComponentType()).thenReturn(processorName); + when(record.getTransitUri()).thenReturn(transitUri); + when(record.getEventType()).thenReturn(ProvenanceEventType.SEND); + + final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class); + when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1"); + + final AnalysisContext context = Mockito.mock(AnalysisContext.class); + when(context.getClusterResolver()).thenReturn(clusterResolvers); + + final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, transitUri, record.getEventType()); + assertNotNull(analyzer); + + final DataSetRefs refs = analyzer.analyze(context, record); + assertEquals(0, refs.getInputs().size()); + assertEquals(1, refs.getOutputs().size()); + Referenceable ref = refs.getOutputs().iterator().next(); + assertEquals("topicA", ref.get(ATTR_NAME)); + assertEquals("topicA", ref.get("topic")); + assertEquals("topicA@cluster1", ref.get(ATTR_QUALIFIED_NAME)); + } + + @Test + public void testPublishKafkaMultipleBrokers() { + final String processorName = "PublishKafka"; + final String transitUri = "PLAINTEXT://0.example.com:6667,1.example.com:6667/topicA"; + final ProvenanceEventRecord record = Mockito.mock(ProvenanceEventRecord.class); + when(record.getComponentType()).thenReturn(processorName); + when(record.getTransitUri()).thenReturn(transitUri); + when(record.getEventType()).thenReturn(ProvenanceEventType.SEND); + + final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class); + when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1"); + + final AnalysisContext context = Mockito.mock(AnalysisContext.class); + when(context.getClusterResolver()).thenReturn(clusterResolvers); + + final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, transitUri, record.getEventType()); + assertNotNull(analyzer); + + final DataSetRefs refs = analyzer.analyze(context, record); + assertEquals(0, refs.getInputs().size()); + assertEquals(1, refs.getOutputs().size()); + Referenceable ref = refs.getOutputs().iterator().next(); + assertEquals("topicA", ref.get(ATTR_NAME)); + assertEquals("topicA", ref.get("topic")); + assertEquals("topicA@cluster1", ref.get(ATTR_QUALIFIED_NAME)); + } + + @Test + public void testConsumeKafka() { + final String processorName = "ConsumeKafka"; + final String transitUri = "PLAINTEXT://0.example.com:6667/topicA"; + final ProvenanceEventRecord record = Mockito.mock(ProvenanceEventRecord.class); + when(record.getComponentType()).thenReturn(processorName); + when(record.getTransitUri()).thenReturn(transitUri); + when(record.getEventType()).thenReturn(ProvenanceEventType.RECEIVE); + + final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class); + when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1"); + + final AnalysisContext context = Mockito.mock(AnalysisContext.class); + when(context.getClusterResolver()).thenReturn(clusterResolvers); + + final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, transitUri, record.getEventType()); + assertNotNull(analyzer); + + final DataSetRefs refs = analyzer.analyze(context, record); + assertEquals(1, refs.getInputs().size()); + assertEquals(0, refs.getOutputs().size()); + Referenceable ref = refs.getInputs().iterator().next(); + assertEquals("kafka_topic", ref.getTypeName()); + assertEquals("topicA", ref.get(ATTR_NAME)); + assertEquals("topicA", ref.get("topic")); + assertEquals("topicA@cluster1", ref.get(ATTR_QUALIFIED_NAME)); + } + + @Test + public void testConsumeKafkaRecord_0_10() { + final String processorName = "ConsumeKafkaRecord_0_10"; + final String transitUri = "PLAINTEXT://0.example.com:6667/topicA"; + final ProvenanceEventRecord record = Mockito.mock(ProvenanceEventRecord.class); + when(record.getComponentType()).thenReturn(processorName); + when(record.getTransitUri()).thenReturn(transitUri); + when(record.getEventType()).thenReturn(ProvenanceEventType.RECEIVE); + + final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class); + when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1"); + + final AnalysisContext context = Mockito.mock(AnalysisContext.class); + when(context.getClusterResolver()).thenReturn(clusterResolvers); + + final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, transitUri, record.getEventType()); + assertNotNull(analyzer); + + final DataSetRefs refs = analyzer.analyze(context, record); + assertEquals(1, refs.getInputs().size()); + assertEquals(0, refs.getOutputs().size()); + Referenceable ref = refs.getInputs().iterator().next(); + assertEquals("kafka_topic", ref.getTypeName()); + assertEquals("topicA", ref.get(ATTR_NAME)); + assertEquals("topicA", ref.get("topic")); + assertEquals("topicA@cluster1", ref.get(ATTR_QUALIFIED_NAME)); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestNiFiRemotePort.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestNiFiRemotePort.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestNiFiRemotePort.java new file mode 100644 index 0000000..3040d50 --- /dev/null +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestNiFiRemotePort.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.provenance.analyzer; + +import org.apache.atlas.typesystem.Referenceable; +import org.apache.nifi.atlas.provenance.AnalysisContext; +import org.apache.nifi.atlas.provenance.DataSetRefs; +import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer; +import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzerFactory; +import org.apache.nifi.atlas.reporting.ITReportLineageToAtlas; +import org.apache.nifi.atlas.resolver.ClusterResolvers; +import org.apache.nifi.controller.status.ConnectionStatus; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_INPUT_PORT; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_OUTPUT_PORT; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.matches; +import static org.mockito.Mockito.when; + +/** + * Tests for RemotePorts. + * More complex and detailed tests are available at {@link ITReportLineageToAtlas}. + */ +public class TestNiFiRemotePort { + + @Test + public void testRemoteInputPort() { + final String componentType = "Remote Input Port"; + final String transitUri = "http://0.example.com:8080/nifi-api/data-transfer/input-ports/port-guid/transactions/tx-guid/flow-files"; + final ProvenanceEventRecord sendEvent = Mockito.mock(ProvenanceEventRecord.class); + when(sendEvent.getEventId()).thenReturn(123L); + when(sendEvent.getComponentId()).thenReturn("port-guid"); + when(sendEvent.getComponentType()).thenReturn(componentType); + when(sendEvent.getTransitUri()).thenReturn(transitUri); + when(sendEvent.getEventType()).thenReturn(ProvenanceEventType.SEND); + + final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class); + when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1"); + + final List<ConnectionStatus> connections = new ArrayList<>(); + final ConnectionStatus connection = new ConnectionStatus(); + connection.setDestinationId("port-guid"); + connection.setDestinationName("inputPortA"); + connections.add(connection); + + final AnalysisContext context = Mockito.mock(AnalysisContext.class); + when(context.getClusterResolver()).thenReturn(clusterResolvers); + when(context.findConnectionTo(matches("port-guid"))).thenReturn(connections); + + final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(componentType, transitUri, sendEvent.getEventType()); + assertNotNull(analyzer); + + final DataSetRefs refs = analyzer.analyze(context, sendEvent); + assertEquals(0, refs.getInputs().size()); + assertEquals(1, refs.getOutputs().size()); + assertEquals(1, refs.getComponentIds().size()); + // Should report connected componentId. + assertTrue(refs.getComponentIds().contains("port-guid")); + + Referenceable ref = refs.getOutputs().iterator().next(); + assertEquals(TYPE_NIFI_INPUT_PORT, ref.getTypeName()); + assertEquals("inputPortA", ref.get(ATTR_NAME)); + assertEquals("port-guid@cluster1", ref.get(ATTR_QUALIFIED_NAME)); + } + + @Test + public void testRemoteOutputPort() { + final String componentType = "Remote Output Port"; + final String transitUri = "http://0.example.com:8080/nifi-api/data-transfer/output-ports/port-guid/transactions/tx-guid/flow-files"; + final ProvenanceEventRecord record = Mockito.mock(ProvenanceEventRecord.class); + when(record.getComponentId()).thenReturn("port-guid"); + when(record.getComponentType()).thenReturn(componentType); + when(record.getTransitUri()).thenReturn(transitUri); + when(record.getEventType()).thenReturn(ProvenanceEventType.RECEIVE); + + final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class); + when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1"); + + final List<ConnectionStatus> connections = new ArrayList<>(); + final ConnectionStatus connection = new ConnectionStatus(); + connection.setSourceId("port-guid"); + connection.setSourceName("outputPortA"); + connections.add(connection); + + final AnalysisContext context = Mockito.mock(AnalysisContext.class); + when(context.getClusterResolver()).thenReturn(clusterResolvers); + when(context.findConnectionFrom(matches("port-guid"))).thenReturn(connections); + + final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(componentType, transitUri, record.getEventType()); + assertNotNull(analyzer); + + final DataSetRefs refs = analyzer.analyze(context, record); + assertEquals(1, refs.getInputs().size()); + assertEquals(0, refs.getOutputs().size()); + Referenceable ref = refs.getInputs().iterator().next(); + assertEquals(TYPE_NIFI_OUTPUT_PORT, ref.getTypeName()); + assertEquals("outputPortA", ref.get(ATTR_NAME)); + assertEquals("port-guid@cluster1", ref.get(ATTR_QUALIFIED_NAME)); + } + + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestPutHiveStreaming.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestPutHiveStreaming.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestPutHiveStreaming.java new file mode 100644 index 0000000..3398dfa --- /dev/null +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestPutHiveStreaming.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.provenance.analyzer; + +import org.apache.atlas.typesystem.Referenceable; +import org.apache.nifi.atlas.provenance.AnalysisContext; +import org.apache.nifi.atlas.provenance.DataSetRefs; +import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer; +import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzerFactory; +import org.apache.nifi.atlas.resolver.ClusterResolvers; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.junit.Test; +import org.mockito.Mockito; + +import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME; +import static org.apache.nifi.atlas.provenance.analyzer.DatabaseAnalyzerUtil.ATTR_OUTPUT_TABLES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.Matchers.matches; +import static org.mockito.Mockito.when; + +public class TestPutHiveStreaming { + + @Test + public void testTableLineage() { + final String processorName = "PutHiveStreaming"; + final String transitUri = "thrift://0.example.com:9083"; + final ProvenanceEventRecord record = Mockito.mock(ProvenanceEventRecord.class); + when(record.getComponentType()).thenReturn(processorName); + when(record.getTransitUri()).thenReturn(transitUri); + when(record.getEventType()).thenReturn(ProvenanceEventType.SEND); + when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("databaseA.tableA"); + + final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class); + when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1"); + + final AnalysisContext context = Mockito.mock(AnalysisContext.class); + when(context.getClusterResolver()).thenReturn(clusterResolvers); + + final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, transitUri, record.getEventType()); + assertNotNull(analyzer); + + final DataSetRefs refs = analyzer.analyze(context, record); + assertEquals(0, refs.getInputs().size()); + assertEquals(1, refs.getOutputs().size()); + Referenceable ref = refs.getOutputs().iterator().next(); + assertEquals("hive_table", ref.getTypeName()); + assertEquals("tableA", ref.get(ATTR_NAME)); + assertEquals("databaseA.tableA@cluster1", ref.get(ATTR_QUALIFIED_NAME)); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestUnknownDataSet.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestUnknownDataSet.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestUnknownDataSet.java new file mode 100644 index 0000000..f4cfe0d --- /dev/null +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestUnknownDataSet.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.provenance.analyzer; + +import org.apache.atlas.typesystem.Referenceable; +import org.apache.nifi.atlas.provenance.AnalysisContext; +import org.apache.nifi.atlas.provenance.DataSetRefs; +import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer; +import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzerFactory; +import org.apache.nifi.atlas.resolver.ClusterResolvers; +import org.apache.nifi.controller.status.ConnectionStatus; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.mockito.Matchers.matches; +import static org.mockito.Mockito.when; + +public class TestUnknownDataSet { + + @Test + public void testGenerateFlowFile() { + final String processorName = "GenerateFlowFile"; + final String processorId = "processor-1234"; + final ProvenanceEventRecord record = Mockito.mock(ProvenanceEventRecord.class); + when(record.getComponentType()).thenReturn(processorName); + when(record.getComponentId()).thenReturn(processorId); + when(record.getEventType()).thenReturn(ProvenanceEventType.CREATE); + + final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class); + when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1"); + + final List<ConnectionStatus> connections = new ArrayList<>(); + + final AnalysisContext context = Mockito.mock(AnalysisContext.class); + when(context.getClusterResolver()).thenReturn(clusterResolvers); + when(context.findConnectionTo(processorId)).thenReturn(connections); + when(context.getNiFiClusterName()).thenReturn("nifi-cluster"); + + final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, null, record.getEventType()); + assertNotNull(analyzer); + + final DataSetRefs refs = analyzer.analyze(context, record); + assertEquals(1, refs.getInputs().size()); + assertEquals(0, refs.getOutputs().size()); + Referenceable ref = refs.getInputs().iterator().next(); + assertEquals("nifi_data", ref.getTypeName()); + assertEquals("GenerateFlowFile", ref.get(ATTR_NAME)); + assertEquals("processor-1234@nifi-cluster", ref.get(ATTR_QUALIFIED_NAME)); + } + + @Test + public void testSomethingHavingIncomingConnection() { + final String processorName = "SomeProcessor"; + final String processorId = "processor-1234"; + final ProvenanceEventRecord record = Mockito.mock(ProvenanceEventRecord.class); + when(record.getComponentType()).thenReturn(processorName); + when(record.getComponentId()).thenReturn(processorId); + when(record.getEventType()).thenReturn(ProvenanceEventType.CREATE); + + final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class); + when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1"); + + final List<ConnectionStatus> connections = new ArrayList<>(); + // The content of connection is not important, just create an empty status. + connections.add(new ConnectionStatus()); + + final AnalysisContext context = Mockito.mock(AnalysisContext.class); + when(context.getClusterResolver()).thenReturn(clusterResolvers); + when(context.findConnectionTo(processorId)).thenReturn(connections); + + final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, null, record.getEventType()); + assertNotNull(analyzer); + + final DataSetRefs refs = analyzer.analyze(context, record); + assertNull("If the processor has incoming connections, no refs should be created", refs); + } + +}
