This is an automated email from the ASF dual-hosted git repository. nixon pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/atlas.git
commit f9d4b22d4f07bdcd4df3cce1e9b1c34c764bec2f Author: Andras Katona <[email protected]> AuthorDate: Thu Dec 3 16:48:14 2020 +0100 ATLAS-3864: Follow up change, removed remaining usage of zkclient even from poms Also removing kafka core dependency from kafka-bridge since it's not used any more Signed-off-by: nixonrodrigues <[email protected]> (cherry picked from commit ab92cf1e6db021e83d8b42ac559f077ab17afa65) --- addons/kafka-bridge/pom.xml | 19 --- .../org/apache/atlas/kafka/bridge/KafkaBridge.java | 21 +-- .../apache/atlas/kafka/bridge/KafkaBridgeTest.java | 170 +++++++++++---------- notification/pom.xml | 10 -- pom.xml | 7 - 5 files changed, 99 insertions(+), 128 deletions(-) diff --git a/addons/kafka-bridge/pom.xml b/addons/kafka-bridge/pom.xml index 8c252d9..2f12c5e 100644 --- a/addons/kafka-bridge/pom.xml +++ b/addons/kafka-bridge/pom.xml @@ -103,20 +103,6 @@ </dependency> <dependency> - <groupId>com.101tec</groupId> - <artifactId>zkclient</artifactId> - <version>${zkclient.version}</version> - <scope>compile</scope> - </dependency> - - <dependency> - <groupId>org.apache.kafka</groupId> - <artifactId>kafka_${kafka.scala.binary.version}</artifactId> - <version>${kafka.version}</version> - <scope>compile</scope> - </dependency> - - <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-webapp</artifactId> <version>${jetty.version}</version> @@ -206,11 +192,6 @@ </artifactItem> <artifactItem> <groupId>org.apache.kafka</groupId> - <artifactId>kafka_${kafka.scala.binary.version}</artifactId> - <version>${kafka.version}</version> - </artifactItem> - <artifactItem> - <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> </artifactItem> diff --git a/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java b/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java index 833a077..d22010d 100644 --- a/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java +++ b/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java @@ -82,7 +82,7 @@ public class KafkaBridge { public static void main(String[] args) { int exitCode = EXIT_CODE_FAILED; AtlasClientV2 atlasClientV2 = null; - KafkaBridge importer = null; + KafkaUtils kafkaUtils = null; try { Options options = new Options(); @@ -111,8 +111,9 @@ public class KafkaBridge { atlasClientV2 = new AtlasClientV2(ugi, ugi.getShortUserName(), urls); } - importer = new KafkaBridge(atlasConf, atlasClientV2); + kafkaUtils = new KafkaUtils(atlasConf); + KafkaBridge importer = new KafkaBridge(atlasConf, atlasClientV2, kafkaUtils); if (StringUtils.isNotEmpty(fileToImport)) { File f = new File(fileToImport); @@ -146,25 +147,19 @@ public class KafkaBridge { if (atlasClientV2 != null) { atlasClientV2.close(); } - if (importer != null) { - importer.close(); + if (kafkaUtils != null) { + kafkaUtils.close(); } } System.exit(exitCode); } - public KafkaBridge(Configuration atlasConf, AtlasClientV2 atlasClientV2) throws Exception { + public KafkaBridge(Configuration atlasConf, AtlasClientV2 atlasClientV2, KafkaUtils kafkaUtils) throws Exception { this.atlasClientV2 = atlasClientV2; this.metadataNamespace = getMetadataNamespace(atlasConf); - this.kafkaUtils = new KafkaUtils(atlasConf); - this.availableTopics = kafkaUtils.listAllTopics(); - } - - public void close() { - if (this.kafkaUtils != null) { - this.kafkaUtils.close(); - } + this.kafkaUtils = kafkaUtils; + this.availableTopics = this.kafkaUtils.listAllTopics(); } private String getMetadataNamespace(Configuration config) { diff --git a/addons/kafka-bridge/src/test/java/org/apache/atlas/kafka/bridge/KafkaBridgeTest.java b/addons/kafka-bridge/src/test/java/org/apache/atlas/kafka/bridge/KafkaBridgeTest.java index c8cc85c..f86ceb5 100644 --- a/addons/kafka-bridge/src/test/java/org/apache/atlas/kafka/bridge/KafkaBridgeTest.java +++ b/addons/kafka-bridge/src/test/java/org/apache/atlas/kafka/bridge/KafkaBridgeTest.java @@ -18,115 +18,127 @@ package org.apache.atlas.kafka.bridge; -import kafka.utils.ZkUtils; -import org.I0Itec.zkclient.ZkConnection; -import org.apache.atlas.AtlasClient; +import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasClientV2; -import org.apache.atlas.AtlasServiceException; -import org.I0Itec.zkclient.ZkClient; -import org.apache.atlas.kafka.bridge.KafkaBridge; import org.apache.atlas.kafka.model.KafkaDataTypes; import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntityHeader; import org.apache.atlas.model.instance.EntityMutationResponse; -import org.mockito.Mock; +import org.apache.atlas.utils.KafkaUtils; +import org.mockito.ArgumentCaptor; import org.mockito.MockitoAnnotations; -import org.testng.Assert; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import scala.Option; -import scala.collection.JavaConverters; -import java.util.ArrayList; import java.util.Collections; -import java.util.List; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; public class KafkaBridgeTest { private static final String TEST_TOPIC_NAME = "test_topic"; - public static final String CLUSTER_NAME = "primary"; - - @Mock - private ZkClient zkClient; - - @Mock - private ZkConnection zkConnection; - - @Mock - private AtlasClient atlasClient; - - @Mock - private AtlasClientV2 atlasClientV2; - - @Mock - private AtlasEntity atlasEntity; - - @Mock - EntityMutationResponse entityMutationResponse; - - @Mock - KafkaBridge kafkaBridge; + public static final AtlasEntity.AtlasEntityWithExtInfo TOPIC_WITH_EXT_INFO = new AtlasEntity.AtlasEntityWithExtInfo( + getTopicEntityWithGuid("0dd466a4-3838-4537-8969-6abb8b9e9185")); + private static final String CLUSTER_NAME = "primary"; + private static final String TOPIC_QUALIFIED_NAME = KafkaBridge.getTopicQualifiedName(CLUSTER_NAME, TEST_TOPIC_NAME); @BeforeMethod public void initializeMocks() { MockitoAnnotations.initMocks(this); } - - @Test - public void testImportTopic() throws Exception { - - List<String> topics = setupTopic(zkClient, TEST_TOPIC_NAME); - - AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo = new AtlasEntity.AtlasEntityWithExtInfo( - getTopicEntityWithGuid("0dd466a4-3838-4537-8969-6abb8b9e9185")); - KafkaBridge kafkaBridge = mock(KafkaBridge.class); - when(kafkaBridge.createEntityInAtlas(atlasEntityWithExtInfo)).thenReturn(atlasEntityWithExtInfo); - - try { - kafkaBridge.importTopic(TEST_TOPIC_NAME); - } catch (Exception e) { - Assert.fail("KafkaBridge import failed ", e); - } - } - - private void returnExistingTopic(String topicName, AtlasClientV2 atlasClientV2, String clusterName) - throws AtlasServiceException { - - when(atlasClientV2.getEntityByAttribute(KafkaDataTypes.KAFKA_TOPIC.getName(), - Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, - getTopicQualifiedName(TEST_TOPIC_NAME,CLUSTER_NAME)))) - .thenReturn((new AtlasEntity.AtlasEntityWithExtInfo( - getTopicEntityWithGuid("0dd466a4-3838-4537-8969-6abb8b9e9185")))); - - } - - private List<String> setupTopic(ZkClient zkClient, String topicName) { - List<String> topics = new ArrayList<>(); - topics.add(topicName); - ZkUtils zkUtils = mock(ZkUtils.class); - when(zkUtils.getAllTopics()).thenReturn(JavaConverters.asScalaIteratorConverter(topics.iterator()).asScala().toSeq()); - return topics; - } - - private AtlasEntity getTopicEntityWithGuid(String guid) { + private static AtlasEntity getTopicEntityWithGuid(String guid) { AtlasEntity ret = new AtlasEntity(KafkaDataTypes.KAFKA_TOPIC.getName()); ret.setGuid(guid); return ret; } - private AtlasEntity createTopicReference() { - AtlasEntity topicEntity = new AtlasEntity(KafkaDataTypes.KAFKA_TOPIC.getName()); - return topicEntity; + @Test + public void testImportTopic() throws Exception { + KafkaUtils mockKafkaUtils = mock(KafkaUtils.class); + when(mockKafkaUtils.listAllTopics()) + .thenReturn(Collections.singletonList(TEST_TOPIC_NAME)); + when(mockKafkaUtils.getPartitionCount(TEST_TOPIC_NAME)) + .thenReturn(3); + + EntityMutationResponse mockCreateResponse = mock(EntityMutationResponse.class); + AtlasEntityHeader mockAtlasEntityHeader = mock(AtlasEntityHeader.class); + when(mockAtlasEntityHeader.getGuid()).thenReturn(TOPIC_WITH_EXT_INFO.getEntity().getGuid()); + when(mockCreateResponse.getCreatedEntities()) + .thenReturn(Collections.singletonList(mockAtlasEntityHeader)); + + AtlasClientV2 mockAtlasClientV2 = mock(AtlasClientV2.class); + when(mockAtlasClientV2.createEntity(any())) + .thenReturn(mockCreateResponse); + when(mockAtlasClientV2.getEntityByGuid(TOPIC_WITH_EXT_INFO.getEntity().getGuid())) + .thenReturn(TOPIC_WITH_EXT_INFO); + + KafkaBridge bridge = new KafkaBridge(ApplicationProperties.get(), mockAtlasClientV2, mockKafkaUtils); + bridge.importTopic(TEST_TOPIC_NAME); + + ArgumentCaptor<AtlasEntity.AtlasEntityWithExtInfo> argumentCaptor = ArgumentCaptor.forClass(AtlasEntity.AtlasEntityWithExtInfo.class); + verify(mockAtlasClientV2).createEntity(argumentCaptor.capture()); + AtlasEntity.AtlasEntityWithExtInfo entity = argumentCaptor.getValue(); + assertEquals(entity.getEntity().getAttribute("qualifiedName"), TOPIC_QUALIFIED_NAME); } - private String createTestTopic(String testTopic) { - return new String(testTopic); + @Test + public void testCreateTopic() throws Exception { + KafkaUtils mockKafkaUtils = mock(KafkaUtils.class); + when(mockKafkaUtils.listAllTopics()) + .thenReturn(Collections.singletonList(TEST_TOPIC_NAME)); + when(mockKafkaUtils.getPartitionCount(TEST_TOPIC_NAME)) + .thenReturn(3); + + EntityMutationResponse mockCreateResponse = mock(EntityMutationResponse.class); + AtlasEntityHeader mockAtlasEntityHeader = mock(AtlasEntityHeader.class); + when(mockAtlasEntityHeader.getGuid()).thenReturn(TOPIC_WITH_EXT_INFO.getEntity().getGuid()); + when(mockCreateResponse.getCreatedEntities()) + .thenReturn(Collections.singletonList(mockAtlasEntityHeader)); + + AtlasClientV2 mockAtlasClientV2 = mock(AtlasClientV2.class); + when(mockAtlasClientV2.createEntity(any())) + .thenReturn(mockCreateResponse); + when(mockAtlasClientV2.getEntityByGuid(TOPIC_WITH_EXT_INFO.getEntity().getGuid())) + .thenReturn(TOPIC_WITH_EXT_INFO); + + KafkaBridge bridge = new KafkaBridge(ApplicationProperties.get(), mockAtlasClientV2, mockKafkaUtils); + AtlasEntity.AtlasEntityWithExtInfo ret = bridge.createOrUpdateTopic(TEST_TOPIC_NAME); + + assertEquals(TOPIC_WITH_EXT_INFO, ret); } - private static String getTopicQualifiedName(String clusterName, String topic) { - return String.format("%s@%s", topic.toLowerCase(), clusterName); + @Test + public void testUpdateTopic() throws Exception { + KafkaUtils mockKafkaUtils = mock(KafkaUtils.class); + when(mockKafkaUtils.listAllTopics()) + .thenReturn(Collections.singletonList(TEST_TOPIC_NAME)); + when(mockKafkaUtils.getPartitionCount(TEST_TOPIC_NAME)) + .thenReturn(3); + + EntityMutationResponse mockUpdateResponse = mock(EntityMutationResponse.class); + AtlasEntityHeader mockAtlasEntityHeader = mock(AtlasEntityHeader.class); + when(mockAtlasEntityHeader.getGuid()).thenReturn(TOPIC_WITH_EXT_INFO.getEntity().getGuid()); + when(mockUpdateResponse.getUpdatedEntities()) + .thenReturn(Collections.singletonList(mockAtlasEntityHeader)); + + AtlasClientV2 mockAtlasClientV2 = mock(AtlasClientV2.class); + when(mockAtlasClientV2.getEntityByAttribute(eq(KafkaDataTypes.KAFKA_TOPIC.getName()), any())) + .thenReturn(TOPIC_WITH_EXT_INFO); + when(mockAtlasClientV2.updateEntity(any())) + .thenReturn(mockUpdateResponse); + when(mockAtlasClientV2.getEntityByGuid(TOPIC_WITH_EXT_INFO.getEntity().getGuid())) + .thenReturn(TOPIC_WITH_EXT_INFO); + + KafkaBridge bridge = new KafkaBridge(ApplicationProperties.get(), mockAtlasClientV2, mockKafkaUtils); + AtlasEntity.AtlasEntityWithExtInfo ret = bridge.createOrUpdateTopic(TEST_TOPIC_NAME); + + assertEquals(TOPIC_WITH_EXT_INFO, ret); } } \ No newline at end of file diff --git a/notification/pom.xml b/notification/pom.xml index 6afa22f..69ed245 100644 --- a/notification/pom.xml +++ b/notification/pom.xml @@ -83,11 +83,6 @@ </dependency> <dependency> - <groupId>com.101tec</groupId> - <artifactId>zkclient</artifactId> - </dependency> - - <dependency> <groupId>org.apache.atlas</groupId> <artifactId>atlas-intg</artifactId> <classifier>tests</classifier> @@ -179,11 +174,6 @@ <version>${kafka.version}</version> </artifactItem> <artifactItem> - <groupId>com.101tec</groupId> - <artifactId>zkclient</artifactId> - <version>${zkclient.version}</version> - </artifactItem> - <artifactItem> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.6</version> diff --git a/pom.xml b/pom.xml index 81719e5..34fbe6f 100644 --- a/pom.xml +++ b/pom.xml @@ -747,7 +747,6 @@ <testng.version>6.9.4</testng.version> <tinkerpop.version>3.4.6</tinkerpop.version> <woodstox-core.version>5.0.3</woodstox-core.version> - <zkclient.version>0.8</zkclient.version> <zookeeper.version>3.4.6</zookeeper.version> </properties> @@ -1657,12 +1656,6 @@ <version>${project.version}</version> </dependency> - <dependency> - <groupId>com.101tec</groupId> - <artifactId>zkclient</artifactId> - <version>${zkclient.version}</version> - </dependency> - <!-- Fix for cassandra-all tranitive dependency CVE-2017-18640 : https://nvd.nist.gov/vuln/detail/CVE-2017-18640 --> <dependency> <groupId>org.yaml</groupId>
