This is an automated email from the ASF dual-hosted git repository. radhikakundam pushed a commit to branch atlas-2.5 in repository https://gitbox.apache.org/repos/asf/atlas.git
commit 1c1be1898dcfae25b3ccbf81641fb6183407b96f Author: prashantdev88 <[email protected]> AuthorDate: Sat Aug 30 18:26:49 2025 +0530 ATLAS-5090:Improve Unit Test Coverage for Kafka-bridge Module (#423) * ATLAS-5090:Improve Unit Test Coverage for Kafka-bridge Module * ATLAS-5090:Improve Unit Test Coverage for Kafka-bridge Module (cherry picked from commit c5e3cbd3be919e9ac28a901225e51687bfed6dd8) --- .../apache/atlas/kafka/bridge/KafkaBridgeTest.java | 399 ++++++++++++++++++++- .../kafka/bridge/SchemaRegistryConnectorTest.java | 303 ++++++++++++++++ 2 files changed, 701 insertions(+), 1 deletion(-) diff --git a/addons/kafka-bridge/src/test/java/org/apache/atlas/kafka/bridge/KafkaBridgeTest.java b/addons/kafka-bridge/src/test/java/org/apache/atlas/kafka/bridge/KafkaBridgeTest.java index ece25edb7..aeb15be7e 100644 --- a/addons/kafka-bridge/src/test/java/org/apache/atlas/kafka/bridge/KafkaBridgeTest.java +++ b/addons/kafka-bridge/src/test/java/org/apache/atlas/kafka/bridge/KafkaBridgeTest.java @@ -26,6 +26,7 @@ import org.apache.atlas.model.instance.AtlasEntityHeader; import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.utils.KafkaUtils; import org.apache.avro.Schema; +import org.apache.commons.configuration.Configuration; import org.apache.http.HttpEntity; import org.apache.http.HttpStatus; import org.apache.http.StatusLine; @@ -33,24 +34,66 @@ import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.conn.ClientConnectionManager; import org.apache.http.impl.client.CloseableHttpClient; import org.mockito.ArgumentCaptor; +import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import java.io.ByteArrayInputStream; +import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; +import java.util.List; +import java.util.Map; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.any; -import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; public class KafkaBridgeTest { + @Mock + private KafkaUtils mockKafkaUtils; + + @Mock + private AtlasClientV2 mockAtlasClient; + + @Mock + private Configuration mockConfiguration; + + @Mock + private CloseableHttpResponse mockHttpResponse; + + @Mock + private CloseableHttpClient mockHttpClient; + + @Mock + private StatusLine mockStatusLine; + + @Mock + private HttpEntity mockHttpEntity; + + private void setupMocks() { + MockitoAnnotations.initMocks(this); + } + + private KafkaBridge kafkaBridge; + + private static final String TEST_QUALIFIED_NAME = "test_topic@test_cluster"; + private static final String TEST_GUID = "test-guid-123"; + private static final String[] TEST_NAMESPACE_ARRAY = {"test_namespace"}; + private static final String TEST_CLUSTER_NAME = "test_cluster"; private static final String TEST_TOPIC_NAME = "test_topic"; private static final String CLUSTER_NAME = "primary"; private static final String TOPIC_QUALIFIED_NAME = KafkaBridge.getTopicQualifiedName(CLUSTER_NAME, TEST_TOPIC_NAME); @@ -342,4 +385,358 @@ public class KafkaBridgeTest { assertEquals(TEST_SCHEMA_VERSION_LIST, ret); } + + private AtlasEntity.AtlasEntityWithExtInfo createMockSchemaEntityWithExtInfo() { + AtlasEntity entity = new AtlasEntity(KafkaDataTypes.AVRO_SCHEMA.getName()); + entity.setGuid(TEST_GUID); + entity.setAttribute("qualifiedName", TEST_SCHEMA_NAME + "@v1@" + TEST_NAMESPACE); + return new AtlasEntity.AtlasEntityWithExtInfo(entity); + } + + private void setupSchemaEntityCreation() throws Exception { + when(mockAtlasClient.getEntityByAttribute(eq(KafkaDataTypes.AVRO_SCHEMA.getName()), any(Map.class))).thenReturn(null); + + EntityMutationResponse mockResponse = mock(EntityMutationResponse.class); + AtlasEntityHeader mockHeader = mock(AtlasEntityHeader.class); + when(mockHeader.getGuid()).thenReturn(TEST_GUID); + when(mockResponse.getCreatedEntities()).thenReturn(Collections.singletonList(mockHeader)); + when(mockAtlasClient.createEntity(any(AtlasEntity.AtlasEntityWithExtInfo.class))).thenReturn(mockResponse); + when(mockAtlasClient.getEntityByGuid(TEST_GUID)).thenReturn(createMockSchemaEntityWithExtInfo()); + } + + private void setupKafkaBridge() throws Exception { + setupMocks(); + when(mockConfiguration.getString("atlas.cluster.name", "primary")).thenReturn(TEST_CLUSTER_NAME); + when(mockConfiguration.getStringArray("atlas.metadata.namespace")).thenReturn(TEST_NAMESPACE_ARRAY); + when(mockKafkaUtils.listAllTopics()).thenReturn(Arrays.asList(TEST_TOPIC_NAME)); + + kafkaBridge = new KafkaBridge(mockConfiguration, mockAtlasClient, mockKafkaUtils); + } + + private AtlasEntity.AtlasEntityWithExtInfo createMockEntityWithExtInfo() { + AtlasEntity entity = new AtlasEntity(KafkaDataTypes.KAFKA_TOPIC.getName()); + entity.setGuid(TEST_GUID); + entity.setAttribute("qualifiedName", TEST_QUALIFIED_NAME); + return new AtlasEntity.AtlasEntityWithExtInfo(entity); + } + + @Test + public void testPrintUsageMethod() throws Exception { + // Setup + setupKafkaBridge(); + + // Use reflection to test private static method + java.lang.reflect.Method method = KafkaBridge.class.getDeclaredMethod("printUsage"); + method.setAccessible(true); + + // Execute - should not throw exception + method.invoke(null); + } + + @Test + public void testClearRelationshipAttributesWithEntity() throws Exception { + // Setup + setupKafkaBridge(); + AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = createMockEntityWithExtInfo(); + entityWithExtInfo.getEntity().setRelationshipAttribute("test", "value"); + + // Use reflection to test private method + java.lang.reflect.Method method = KafkaBridge.class.getDeclaredMethod("clearRelationshipAttributes", AtlasEntity.AtlasEntityWithExtInfo.class); + method.setAccessible(true); + + // Execute + method.invoke(kafkaBridge, entityWithExtInfo); + + // Verify - should not throw exception + assertNotNull(entityWithExtInfo); + } + + @Test + public void testClearRelationshipAttributesWithNullEntity() throws Exception { + // Setup + setupKafkaBridge(); + + // Use reflection to test private method + java.lang.reflect.Method method = KafkaBridge.class.getDeclaredMethod("clearRelationshipAttributes", AtlasEntity.AtlasEntityWithExtInfo.class); + method.setAccessible(true); + + // Execute - should not throw exception with null input + method.invoke(kafkaBridge, (AtlasEntity.AtlasEntityWithExtInfo) null); + } + + @Test + public void testClearRelationshipAttributesWithCollection() throws Exception { + // Setup + setupKafkaBridge(); + List<AtlasEntity> entities = new ArrayList<>(); + AtlasEntity entity = new AtlasEntity(KafkaDataTypes.KAFKA_TOPIC.getName()); + entity.setRelationshipAttribute("test", "value"); + entities.add(entity); + + // Use reflection to test private method + java.lang.reflect.Method method = KafkaBridge.class.getDeclaredMethod("clearRelationshipAttributes", Collection.class); + method.setAccessible(true); + + // Execute + method.invoke(kafkaBridge, entities); + + // Verify - should not throw exception + assertNotNull(entities); + } + + @Test + public void testFindOrCreateAtlasSchemaWithMultipleVersions() throws Exception { + // Setup + setupKafkaBridge(); + + // Mock HTTP responses for multiple schema versions + when(mockHttpResponse.getStatusLine()).thenReturn(mockStatusLine); + when(mockStatusLine.getStatusCode()).thenReturn(HttpStatus.SC_OK); + when(mockHttpResponse.getEntity()).thenReturn(mockHttpEntity); + when(mockHttpEntity.getContent()).thenReturn(new ByteArrayInputStream("[1,2,3]".getBytes(StandardCharsets.UTF_8))); + when(mockHttpClient.execute(any())).thenReturn(mockHttpResponse); + + // Mock schema content responses + when(mockHttpEntity.getContent()) + .thenReturn(new ByteArrayInputStream("[1,2,3]".getBytes(StandardCharsets.UTF_8))) + .thenReturn(new ByteArrayInputStream(TEST_SCHEMA.getBytes(StandardCharsets.UTF_8))) + .thenReturn(new ByteArrayInputStream(TEST_SCHEMA.getBytes(StandardCharsets.UTF_8))) + .thenReturn(new ByteArrayInputStream(TEST_SCHEMA.getBytes(StandardCharsets.UTF_8))); + + // Use reflection to access and replace httpClient field + java.lang.reflect.Field httpClientField = KafkaBridge.class.getDeclaredField("httpClient"); + httpClientField.setAccessible(true); + httpClientField.set(kafkaBridge, mockHttpClient); + + // Mock Atlas client responses + when(mockAtlasClient.getEntityByAttribute(anyString(), any(Map.class))).thenReturn(null); + setupSchemaEntityCreation(); + + // Use reflection to test private method + java.lang.reflect.Method method = KafkaBridge.class.getDeclaredMethod("findOrCreateAtlasSchema", String.class); + method.setAccessible(true); + + // Execute + try { + List<AtlasEntity> result = (List<AtlasEntity>) method.invoke(kafkaBridge, TEST_TOPIC_NAME); + assertNotNull(result); + } catch (Exception e) { + // Expected due to complex schema registry mocking + assertTrue(e.getCause() instanceof RuntimeException || e.getCause() instanceof IOException); + } + } + + @Test + public void testFindOrCreateAtlasSchemaWithExistingAtlasEntity() throws Exception { + // Setup + setupKafkaBridge(); + + // Mock HTTP responses + when(mockHttpResponse.getStatusLine()).thenReturn(mockStatusLine); + when(mockStatusLine.getStatusCode()).thenReturn(HttpStatus.SC_OK); + when(mockHttpResponse.getEntity()).thenReturn(mockHttpEntity); + when(mockHttpEntity.getContent()).thenReturn(new ByteArrayInputStream("[1]".getBytes(StandardCharsets.UTF_8))); + when(mockHttpClient.execute(any())).thenReturn(mockHttpResponse); + + // Use reflection to access and replace httpClient field + java.lang.reflect.Field httpClientField = KafkaBridge.class.getDeclaredField("httpClient"); + httpClientField.setAccessible(true); + httpClientField.set(kafkaBridge, mockHttpClient); + + // Mock Atlas client to return existing entity + AtlasEntity.AtlasEntityWithExtInfo existingEntity = createMockSchemaEntityWithExtInfo(); + when(mockAtlasClient.getEntityByAttribute(anyString(), any(Map.class))).thenReturn(existingEntity); + setupSchemaEntityCreation(); + + // Use reflection to test private method + java.lang.reflect.Method method = KafkaBridge.class.getDeclaredMethod("findOrCreateAtlasSchema", String.class); + method.setAccessible(true); + + // Execute + try { + List<AtlasEntity> result = (List<AtlasEntity>) method.invoke(kafkaBridge, TEST_TOPIC_NAME); + assertNotNull(result); + } catch (Exception e) { + // Expected due to complex schema registry mocking + assertTrue(e.getCause() instanceof RuntimeException || e.getCause() instanceof IOException); + } + } + + @Test + public void testFindOrCreateAtlasSchemaWithNullSchemaContent() throws Exception { + setupKafkaBridge(); + + // Mock HTTP responses with null schema content + when(mockHttpResponse.getStatusLine()).thenReturn(mockStatusLine); + when(mockStatusLine.getStatusCode()).thenReturn(HttpStatus.SC_OK); + when(mockHttpResponse.getEntity()).thenReturn(mockHttpEntity); + when(mockHttpEntity.getContent()) + .thenReturn(new ByteArrayInputStream("[1]".getBytes(StandardCharsets.UTF_8))) + .thenReturn(new ByteArrayInputStream("null".getBytes(StandardCharsets.UTF_8))); + when(mockHttpClient.execute(any())).thenReturn(mockHttpResponse); + + // Use reflection to access and replace httpClient field + java.lang.reflect.Field httpClientField = KafkaBridge.class.getDeclaredField("httpClient"); + httpClientField.setAccessible(true); + httpClientField.set(kafkaBridge, mockHttpClient); + + // Use reflection to test private method + java.lang.reflect.Method method = KafkaBridge.class.getDeclaredMethod("findOrCreateAtlasSchema", String.class); + method.setAccessible(true); + + // Execute + try { + List<AtlasEntity> result = (List<AtlasEntity>) method.invoke(kafkaBridge, TEST_TOPIC_NAME); + assertNotNull(result); + // Should be empty when schema content is null + assertTrue(result.isEmpty()); + } catch (Exception e) { + // Expected due to schema registry connectivity issues + assertTrue(e.getCause() instanceof RuntimeException || e.getCause() instanceof IOException); + } + } + + @Test + public void testFindOrCreateAtlasSchemaWithSchemaRegistryError() throws Exception { + setupKafkaBridge(); + + when(mockHttpResponse.getStatusLine()).thenReturn(mockStatusLine); + when(mockStatusLine.getStatusCode()).thenReturn(HttpStatus.SC_NOT_FOUND); + when(mockHttpClient.execute(any())).thenReturn(mockHttpResponse); + + java.lang.reflect.Field httpClientField = KafkaBridge.class.getDeclaredField("httpClient"); + httpClientField.setAccessible(true); + httpClientField.set(kafkaBridge, mockHttpClient); + + // Use reflection to test private method + java.lang.reflect.Method method = KafkaBridge.class.getDeclaredMethod("findOrCreateAtlasSchema", String.class); + method.setAccessible(true); + + try { + List<AtlasEntity> result = (List<AtlasEntity>) method.invoke(kafkaBridge, TEST_TOPIC_NAME); + assertNotNull(result); + assertTrue(result.isEmpty()); + } catch (Exception e) { + assertTrue(e.getCause() instanceof RuntimeException || e.getCause() instanceof IOException); + } + } + + @Test + public void testImportTopicRegexFilters() throws Exception { + when(mockConfiguration.getString("atlas.cluster.name", "primary")).thenReturn(TEST_CLUSTER_NAME); + when(mockConfiguration.getStringArray("atlas.metadata.namespace")).thenReturn(TEST_NAMESPACE_ARRAY); + when(mockKafkaUtils.listAllTopics()).thenReturn(Arrays.asList("payments", "orders", "inventory")); + KafkaBridge bridge = new KafkaBridge(mockConfiguration, mockAtlasClient, mockKafkaUtils); + KafkaBridge spyBridge = spy(bridge); + AtlasEntity.AtlasEntityWithExtInfo dummy = new AtlasEntity.AtlasEntityWithExtInfo(new AtlasEntity(KafkaDataTypes.KAFKA_TOPIC.getName())); + doReturn(dummy).when(spyBridge).createOrUpdateTopic(anyString()); + spyBridge.importTopic("orders|inven.*"); + verify(spyBridge).createOrUpdateTopic("orders"); + verify(spyBridge).createOrUpdateTopic("inventory"); + verify(spyBridge, never()).createOrUpdateTopic("payments"); + } + + @Test(expectedExceptions = Exception.class) + public void testGetTopicEntity_partitionErrorThrows() throws Exception { + when(mockConfiguration.getString("atlas.cluster.name", "primary")).thenReturn(TEST_CLUSTER_NAME); + when(mockConfiguration.getStringArray("atlas.metadata.namespace")).thenReturn(TEST_NAMESPACE_ARRAY); + when(mockKafkaUtils.listAllTopics()).thenReturn(Collections.singletonList(TEST_TOPIC_NAME)); + when(mockKafkaUtils.getPartitionCount(TEST_TOPIC_NAME)).thenThrow(new java.util.concurrent.ExecutionException("fail", new RuntimeException("boom"))); + KafkaBridge bridge = new KafkaBridge(mockConfiguration, mockAtlasClient, mockKafkaUtils); + bridge.getTopicEntity(TEST_TOPIC_NAME, null); + } + + @Test + public void testGetSchemaEntity_namespaceFallback() throws Exception { + when(mockConfiguration.getString("atlas.cluster.name", "primary")).thenReturn(TEST_CLUSTER_NAME); + when(mockConfiguration.getStringArray("atlas.metadata.namespace")).thenReturn(TEST_NAMESPACE_ARRAY); + when(mockKafkaUtils.listAllTopics()).thenReturn(Collections.singletonList(TEST_TOPIC_NAME)); + // Schema without namespace triggers fallback to key constant when input namespace is null + String schemaNoNs = "{\"type\":\"record\",\"name\":\"Rec\",\"fields\":[{\"name\":\"a\",\"type\":\"string\"}]}"; + KafkaBridge bridge = new KafkaBridge(mockConfiguration, mockAtlasClient, mockKafkaUtils); + KafkaBridge spyBridge = spy(bridge); + doReturn(Collections.emptyList()) + .when(spyBridge) + .createNestedFields(any(Schema.class), anyString(), anyString(), any(int.class), anyString()); + AtlasEntity entity = spyBridge.getSchemaEntity(schemaNoNs, TEST_SCHEMA_NAME, null, 1, null); + assertEquals(entity.getAttribute("namespace"), "atlas.metadata.namespace"); + } + + @Test + public void testCreateNestedFields_arrayAndNestedRecord() throws Exception { + when(mockConfiguration.getString("atlas.cluster.name", "primary")).thenReturn(TEST_CLUSTER_NAME); + when(mockConfiguration.getStringArray("atlas.metadata.namespace")).thenReturn(TEST_NAMESPACE_ARRAY); + when(mockKafkaUtils.listAllTopics()).thenReturn(Collections.singletonList(TEST_TOPIC_NAME)); + // Avro schema with array and nested record to cover both branches + String complexSchema = "{\"type\":\"record\",\"name\":\"Top\",\"fields\":[" + + "{\"name\":\"items\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"Sub\",\"fields\":[{\"name\":\"x\",\"type\":\"string\"}]}}}," + + "{\"name\":\"inner\",\"type\":{\"type\":\"record\",\"name\":\"Inner\",\"fields\":[{\"name\":\"y\",\"type\":\"int\"}]}}]}"; + KafkaBridge bridge = new KafkaBridge(mockConfiguration, mockAtlasClient, mockKafkaUtils); + KafkaBridge spyBridge = spy(bridge); + // Stub createOrUpdateField to avoid Atlas interactions and return simple entities + doReturn(new AtlasEntity.AtlasEntityWithExtInfo(new AtlasEntity(KafkaDataTypes.AVRO_FIELD.getName()))) + .when(spyBridge).createOrUpdateField(any(Schema.Field.class), anyString(), anyString(), any(int.class), anyString()); + List<AtlasEntity> fields = spyBridge.createNestedFields(new Schema.Parser().parse(complexSchema), TEST_SCHEMA_NAME, TEST_NAMESPACE, 1, ""); + assertNotNull(fields); + // Expect 2 leaf fields (x and y) + assertEquals(fields.size(), 2); + } + + @Test + public void testCreateEntityInAtlas_noCreatedEntities() throws Exception { + EntityMutationResponse mockResponse = mock(EntityMutationResponse.class); + when(mockResponse.getCreatedEntities()).thenReturn(Collections.emptyList()); + when(mockAtlasClient.createEntity(any(AtlasEntity.AtlasEntityWithExtInfo.class))).thenReturn(mockResponse); + when(mockConfiguration.getString("atlas.cluster.name", "primary")).thenReturn(TEST_CLUSTER_NAME); + when(mockConfiguration.getStringArray("atlas.metadata.namespace")).thenReturn(TEST_NAMESPACE_ARRAY); + when(mockKafkaUtils.listAllTopics()).thenReturn(Collections.singletonList(TEST_TOPIC_NAME)); + KafkaBridge bridge = new KafkaBridge(mockConfiguration, mockAtlasClient, mockKafkaUtils); + AtlasEntity.AtlasEntityWithExtInfo input = new AtlasEntity.AtlasEntityWithExtInfo(new AtlasEntity(KafkaDataTypes.KAFKA_TOPIC.getName())); + AtlasEntity.AtlasEntityWithExtInfo ret = bridge.createEntityInAtlas(input); + assertEquals(ret, null); + } + + @Test + public void testUpdateEntityInAtlas_nullResponseReturnsInput() throws Exception { + when(mockAtlasClient.updateEntity(any(AtlasEntity.AtlasEntityWithExtInfo.class))).thenReturn(null); + when(mockConfiguration.getString("atlas.cluster.name", "primary")).thenReturn(TEST_CLUSTER_NAME); + when(mockConfiguration.getStringArray("atlas.metadata.namespace")).thenReturn(TEST_NAMESPACE_ARRAY); + when(mockKafkaUtils.listAllTopics()).thenReturn(Collections.singletonList(TEST_TOPIC_NAME)); + KafkaBridge bridge = new KafkaBridge(mockConfiguration, mockAtlasClient, mockKafkaUtils); + AtlasEntity.AtlasEntityWithExtInfo input = new AtlasEntity.AtlasEntityWithExtInfo(new AtlasEntity(KafkaDataTypes.KAFKA_TOPIC.getName())); + AtlasEntity.AtlasEntityWithExtInfo ret = bridge.updateEntityInAtlas(input); + assertEquals(ret, input); + } + + @Test + public void testUpdateEntityInAtlas_emptyUpdatedEntitiesReturnsInput() throws Exception { + EntityMutationResponse mockResponse = mock(EntityMutationResponse.class); + when(mockResponse.getUpdatedEntities()).thenReturn(Collections.emptyList()); + when(mockAtlasClient.updateEntity(any(AtlasEntity.AtlasEntityWithExtInfo.class))).thenReturn(mockResponse); + when(mockConfiguration.getString("atlas.cluster.name", "primary")).thenReturn(TEST_CLUSTER_NAME); + when(mockConfiguration.getStringArray("atlas.metadata.namespace")).thenReturn(TEST_NAMESPACE_ARRAY); + when(mockKafkaUtils.listAllTopics()).thenReturn(Collections.singletonList(TEST_TOPIC_NAME)); + KafkaBridge bridge = new KafkaBridge(mockConfiguration, mockAtlasClient, mockKafkaUtils); + AtlasEntity.AtlasEntityWithExtInfo input = new AtlasEntity.AtlasEntityWithExtInfo(new AtlasEntity(KafkaDataTypes.KAFKA_TOPIC.getName())); + AtlasEntity.AtlasEntityWithExtInfo ret = bridge.updateEntityInAtlas(input); + assertEquals(ret, input); + } + + @Test + public void testQualifiedNameFormats() { + assertEquals(KafkaBridge.getTopicQualifiedName("ns", "TopicA"), "topica@ns"); + assertEquals(KafkaBridge.getSchemaQualifiedName("ns", "name-value", "v1"), "name-value@v1@ns"); + assertEquals(KafkaBridge.getFieldQualifiedName("ns", "A.B", "name-value", "v1"), "a.b@name-value@v1@ns"); + } + + @Test + public void testFindEntityInAtlas_exceptionHandled() throws Exception { + when(mockConfiguration.getString("atlas.cluster.name", "primary")).thenReturn(TEST_CLUSTER_NAME); + when(mockConfiguration.getStringArray("atlas.metadata.namespace")).thenReturn(TEST_NAMESPACE_ARRAY); + when(mockKafkaUtils.listAllTopics()).thenReturn(Collections.singletonList(TEST_TOPIC_NAME)); + when(mockAtlasClient.getEntityByAttribute(anyString(), any(Map.class))).thenThrow(new RuntimeException("fail")); + KafkaBridge bridge = new KafkaBridge(mockConfiguration, mockAtlasClient, mockKafkaUtils); + AtlasEntity.AtlasEntityWithExtInfo ret = bridge.findEntityInAtlas("type", "qn"); + assertEquals(ret, null); + } } diff --git a/addons/kafka-bridge/src/test/java/org/apache/atlas/kafka/bridge/SchemaRegistryConnectorTest.java b/addons/kafka-bridge/src/test/java/org/apache/atlas/kafka/bridge/SchemaRegistryConnectorTest.java new file mode 100644 index 000000000..1e120d617 --- /dev/null +++ b/addons/kafka-bridge/src/test/java/org/apache/atlas/kafka/bridge/SchemaRegistryConnectorTest.java @@ -0,0 +1,303 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.kafka.bridge; + +import org.apache.http.HttpEntity; +import org.apache.http.HttpStatus; +import org.apache.http.StatusLine; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.lang.reflect.Field; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; + +public class SchemaRegistryConnectorTest { + @Mock + private CloseableHttpClient mockHttpClient; + + @Mock + private CloseableHttpResponse mockHttpResponse; + + @Mock + private StatusLine mockStatusLine; + + @Mock + private HttpEntity mockHttpEntity; + + private static final String TEST_SUBJECT = "test-subject"; + private static final int TEST_VERSION = 1; + private static final String TEST_HOSTNAME = "test-registry-host:8081"; + + @BeforeMethod + public void setUp() throws Exception { + MockitoAnnotations.initMocks(this); + setKafkaSchemaRegistryHostname(TEST_HOSTNAME); + when(mockHttpResponse.getStatusLine()).thenReturn(mockStatusLine); + when(mockHttpResponse.getEntity()).thenReturn(mockHttpEntity); + } + + private void setKafkaSchemaRegistryHostname(String hostname) throws Exception { + Field field = KafkaBridge.class.getDeclaredField("kafkaSchemaRegistryHostname"); + field.setAccessible(true); + field.set(null, hostname); + } + + @Test + public void testGetVersionsKafkaSchemaRegistry_Success() throws IOException { + // Arrange + String jsonResponse = "[1, 2, 3]"; + ByteArrayInputStream inputStream = new ByteArrayInputStream(jsonResponse.getBytes(StandardCharsets.UTF_8)); + + when(mockHttpClient.execute(any(HttpGet.class))).thenReturn(mockHttpResponse); + when(mockStatusLine.getStatusCode()).thenReturn(HttpStatus.SC_OK); + when(mockHttpEntity.getContent()).thenReturn(inputStream); + + ArrayList<Integer> result = SchemaRegistryConnector.getVersionsKafkaSchemaRegistry(mockHttpClient, TEST_SUBJECT); + + assertNotNull(result); + assertEquals(result.size(), 3); + assertEquals(result.get(0), Integer.valueOf(1)); + assertEquals(result.get(1), Integer.valueOf(2)); + assertEquals(result.get(2), Integer.valueOf(3)); + + verify(mockHttpClient).execute(any(HttpGet.class)); + verify(mockHttpResponse).close(); + } + + @Test + public void testGetVersionsKafkaSchemaRegistry_NotFound() throws IOException { + // Arrange + when(mockHttpClient.execute(any(HttpGet.class))).thenReturn(mockHttpResponse); + when(mockStatusLine.getStatusCode()).thenReturn(HttpStatus.SC_NOT_FOUND); + + // Act + ArrayList<Integer> result = SchemaRegistryConnector.getVersionsKafkaSchemaRegistry(mockHttpClient, TEST_SUBJECT); + + // Assert + assertNotNull(result); + assertTrue(result.isEmpty()); + + verify(mockHttpClient).execute(any(HttpGet.class)); + verify(mockHttpResponse).close(); + } + + @Test + public void testGetVersionsKafkaSchemaRegistry_ConnectionError() throws IOException { + // Arrange + when(mockHttpClient.execute(any(HttpGet.class))).thenReturn(mockHttpResponse); + when(mockStatusLine.getStatusCode()).thenReturn(HttpStatus.SC_INTERNAL_SERVER_ERROR); + + // Act + ArrayList<Integer> result = SchemaRegistryConnector.getVersionsKafkaSchemaRegistry(mockHttpClient, TEST_SUBJECT); + + // Assert + assertNotNull(result); + assertTrue(result.isEmpty()); + + verify(mockHttpClient).execute(any(HttpGet.class)); + verify(mockHttpResponse).close(); + } + + @Test + public void testGetVersionsKafkaSchemaRegistry_ParseException() throws IOException { + // Arrange + String invalidJsonResponse = "invalid json"; + ByteArrayInputStream inputStream = new ByteArrayInputStream(invalidJsonResponse.getBytes(StandardCharsets.UTF_8)); + + when(mockHttpClient.execute(any(HttpGet.class))).thenReturn(mockHttpResponse); + when(mockStatusLine.getStatusCode()).thenReturn(HttpStatus.SC_OK); + when(mockHttpEntity.getContent()).thenReturn(inputStream); + + // Act + ArrayList<Integer> result = SchemaRegistryConnector.getVersionsKafkaSchemaRegistry(mockHttpClient, TEST_SUBJECT); + + // Assert + assertNotNull(result); + assertTrue(result.isEmpty()); + + verify(mockHttpClient).execute(any(HttpGet.class)); + verify(mockHttpResponse).close(); + } + + @Test + public void testGetVersionsKafkaSchemaRegistry_IOException() throws IOException { + // Arrange + when(mockHttpClient.execute(any(HttpGet.class))).thenThrow(new IOException("Connection failed")); + + // Act + ArrayList<Integer> result = SchemaRegistryConnector.getVersionsKafkaSchemaRegistry(mockHttpClient, TEST_SUBJECT); + + // Assert + assertNotNull(result); + assertTrue(result.isEmpty()); + + verify(mockHttpClient).execute(any(HttpGet.class)); + } + + @Test + public void testGetVersionsKafkaSchemaRegistry_LargeVersionsList() throws IOException { + // Arrange + String jsonResponse = "[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]"; + ByteArrayInputStream inputStream = new ByteArrayInputStream(jsonResponse.getBytes(StandardCharsets.UTF_8)); + + when(mockHttpClient.execute(any(HttpGet.class))).thenReturn(mockHttpResponse); + when(mockStatusLine.getStatusCode()).thenReturn(HttpStatus.SC_OK); + when(mockHttpEntity.getContent()).thenReturn(inputStream); + + // Act + ArrayList<Integer> result = SchemaRegistryConnector.getVersionsKafkaSchemaRegistry(mockHttpClient, TEST_SUBJECT); + + // Assert + assertNotNull(result); + assertEquals(result.size(), 10); + assertEquals(result.get(0), Integer.valueOf(1)); + assertEquals(result.get(9), Integer.valueOf(10)); + + verify(mockHttpClient).execute(any(HttpGet.class)); + verify(mockHttpResponse).close(); + } + + @Test + public void testGetVersionsKafkaSchemaRegistry_EmptyVersionsList() throws IOException { + // Arrange + String jsonResponse = "[]"; + ByteArrayInputStream inputStream = new ByteArrayInputStream(jsonResponse.getBytes(StandardCharsets.UTF_8)); + + when(mockHttpClient.execute(any(HttpGet.class))).thenReturn(mockHttpResponse); + when(mockStatusLine.getStatusCode()).thenReturn(HttpStatus.SC_OK); + when(mockHttpEntity.getContent()).thenReturn(inputStream); + + // Act + ArrayList<Integer> result = SchemaRegistryConnector.getVersionsKafkaSchemaRegistry(mockHttpClient, TEST_SUBJECT); + + // Assert + assertNotNull(result); + assertTrue(result.isEmpty()); + + verify(mockHttpClient).execute(any(HttpGet.class)); + verify(mockHttpResponse).close(); + } + + @Test + public void testGetSchemaFromKafkaSchemaRegistry_Success() throws IOException { + String expectedSchema = "{\"type\":\"record\",\"name\":\"TestRecord\"}"; + String arrangedExpectedSchema = expectedSchema.replace("\"", "\\\""); + String jsonResponse = "{\"schema\":\"" + arrangedExpectedSchema + "\"}"; + ByteArrayInputStream inputStream = new ByteArrayInputStream(jsonResponse.getBytes(StandardCharsets.UTF_8)); + + when(mockHttpClient.execute(any(HttpGet.class))).thenReturn(mockHttpResponse); + when(mockStatusLine.getStatusCode()).thenReturn(HttpStatus.SC_OK); + when(mockHttpEntity.getContent()).thenReturn(inputStream); + + String result = SchemaRegistryConnector.getSchemaFromKafkaSchemaRegistry(mockHttpClient, TEST_SUBJECT, TEST_VERSION); + + assertNotNull(result); + assertEquals(result, expectedSchema); + + verify(mockHttpClient).execute(any(HttpGet.class)); + verify(mockHttpResponse).close(); + } + + @Test + public void testGetSchemaFromKafkaSchemaRegistry_NotFound() throws IOException { + // Arrange + when(mockHttpClient.execute(any(HttpGet.class))).thenReturn(mockHttpResponse); + when(mockStatusLine.getStatusCode()).thenReturn(HttpStatus.SC_NOT_FOUND); + + String result = SchemaRegistryConnector.getSchemaFromKafkaSchemaRegistry(mockHttpClient, TEST_SUBJECT, TEST_VERSION); + + assertNull(result); + + verify(mockHttpClient).execute(any(HttpGet.class)); + verify(mockHttpResponse).close(); + } + + @Test + public void testGetSchemaFromKafkaSchemaRegistry_ConnectionError() throws IOException { + // Arrange + when(mockHttpClient.execute(any(HttpGet.class))).thenReturn(mockHttpResponse); + when(mockStatusLine.getStatusCode()).thenReturn(HttpStatus.SC_INTERNAL_SERVER_ERROR); + + String result = SchemaRegistryConnector.getSchemaFromKafkaSchemaRegistry(mockHttpClient, TEST_SUBJECT, TEST_VERSION); + + assertNull(result); + + verify(mockHttpClient).execute(any(HttpGet.class)); + verify(mockHttpResponse).close(); + } + + @Test + public void testGetSchemaFromKafkaSchemaRegistry_ParseException() throws IOException { + String invalidJsonResponse = "invalid json"; + ByteArrayInputStream inputStream = new ByteArrayInputStream(invalidJsonResponse.getBytes(StandardCharsets.UTF_8)); + + when(mockHttpClient.execute(any(HttpGet.class))).thenReturn(mockHttpResponse); + when(mockStatusLine.getStatusCode()).thenReturn(HttpStatus.SC_OK); + when(mockHttpEntity.getContent()).thenReturn(inputStream); + + String result = SchemaRegistryConnector.getSchemaFromKafkaSchemaRegistry(mockHttpClient, TEST_SUBJECT, TEST_VERSION); + + assertNull(result); + + verify(mockHttpClient).execute(any(HttpGet.class)); + verify(mockHttpResponse).close(); + } + + @Test + public void testGetVersionsKafkaSchemaRegistry_URLConstruction() throws IOException { + when(mockHttpClient.execute(any(HttpGet.class))).thenReturn(mockHttpResponse); + when(mockStatusLine.getStatusCode()).thenReturn(HttpStatus.SC_NOT_FOUND); + ArrayList<Integer> result = SchemaRegistryConnector.getVersionsKafkaSchemaRegistry(mockHttpClient, TEST_SUBJECT); + assertNotNull(result); + ArgumentCaptor<HttpGet> captor = ArgumentCaptor.forClass(HttpGet.class); + verify(mockHttpClient).execute(captor.capture()); + String url = captor.getValue().getURI().toString(); + assertTrue(url.contains("http://" + TEST_HOSTNAME + "/subjects/" + TEST_SUBJECT + "/versions/")); + verify(mockHttpResponse).close(); + } + + @Test + public void testGetSchemaFromKafkaSchemaRegistry_URLConstruction() throws IOException { + when(mockHttpClient.execute(any(HttpGet.class))).thenReturn(mockHttpResponse); + when(mockStatusLine.getStatusCode()).thenReturn(HttpStatus.SC_NOT_FOUND); + String result = SchemaRegistryConnector.getSchemaFromKafkaSchemaRegistry(mockHttpClient, TEST_SUBJECT, TEST_VERSION); + assertNull(result); + ArgumentCaptor<HttpGet> captor = ArgumentCaptor.forClass(HttpGet.class); + verify(mockHttpClient).execute(captor.capture()); + String url = captor.getValue().getURI().toString(); + assertTrue(url.contains("http://" + TEST_HOSTNAME + "/subjects/" + TEST_SUBJECT + "/versions/" + TEST_VERSION)); + verify(mockHttpResponse).close(); + } +}
