This is an automated email from the ASF dual-hosted git repository.

radhikakundam pushed a commit to branch atlas-2.5
in repository https://gitbox.apache.org/repos/asf/atlas.git

commit 1c1be1898dcfae25b3ccbf81641fb6183407b96f
Author: prashantdev88 <[email protected]>
AuthorDate: Sat Aug 30 18:26:49 2025 +0530

    ATLAS-5090:Improve Unit Test Coverage for Kafka-bridge Module (#423)
    
    * ATLAS-5090:Improve Unit Test Coverage for Kafka-bridge Module
    
    * ATLAS-5090:Improve Unit Test Coverage for Kafka-bridge Module
    
    (cherry picked from commit c5e3cbd3be919e9ac28a901225e51687bfed6dd8)
---
 .../apache/atlas/kafka/bridge/KafkaBridgeTest.java | 399 ++++++++++++++++++++-
 .../kafka/bridge/SchemaRegistryConnectorTest.java  | 303 ++++++++++++++++
 2 files changed, 701 insertions(+), 1 deletion(-)

diff --git 
a/addons/kafka-bridge/src/test/java/org/apache/atlas/kafka/bridge/KafkaBridgeTest.java
 
b/addons/kafka-bridge/src/test/java/org/apache/atlas/kafka/bridge/KafkaBridgeTest.java
index ece25edb7..aeb15be7e 100644
--- 
a/addons/kafka-bridge/src/test/java/org/apache/atlas/kafka/bridge/KafkaBridgeTest.java
+++ 
b/addons/kafka-bridge/src/test/java/org/apache/atlas/kafka/bridge/KafkaBridgeTest.java
@@ -26,6 +26,7 @@ import org.apache.atlas.model.instance.AtlasEntityHeader;
 import org.apache.atlas.model.instance.EntityMutationResponse;
 import org.apache.atlas.utils.KafkaUtils;
 import org.apache.avro.Schema;
+import org.apache.commons.configuration.Configuration;
 import org.apache.http.HttpEntity;
 import org.apache.http.HttpStatus;
 import org.apache.http.StatusLine;
@@ -33,24 +34,66 @@ import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.conn.ClientConnectionManager;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import java.io.ByteArrayInputStream;
+import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
 
 public class KafkaBridgeTest {
+    @Mock
+    private KafkaUtils mockKafkaUtils;
+
+    @Mock
+    private AtlasClientV2 mockAtlasClient;
+
+    @Mock
+    private Configuration mockConfiguration;
+
+    @Mock
+    private CloseableHttpResponse mockHttpResponse;
+
+    @Mock
+    private CloseableHttpClient mockHttpClient;
+
+    @Mock
+    private StatusLine mockStatusLine;
+
+    @Mock
+    private HttpEntity mockHttpEntity;
+
+    private void setupMocks() {
+        MockitoAnnotations.initMocks(this);
+    }
+
+    private KafkaBridge kafkaBridge;
+
+    private static final String TEST_QUALIFIED_NAME = 
"test_topic@test_cluster";
+    private static final String TEST_GUID = "test-guid-123";
+    private static final String[] TEST_NAMESPACE_ARRAY = {"test_namespace"};
+    private static final String TEST_CLUSTER_NAME = "test_cluster";
     private static final String TEST_TOPIC_NAME = "test_topic";
     private static final String CLUSTER_NAME = "primary";
     private static final String TOPIC_QUALIFIED_NAME = 
KafkaBridge.getTopicQualifiedName(CLUSTER_NAME, TEST_TOPIC_NAME);
@@ -342,4 +385,358 @@ public class KafkaBridgeTest {
 
         assertEquals(TEST_SCHEMA_VERSION_LIST, ret);
     }
+
+    private AtlasEntity.AtlasEntityWithExtInfo 
createMockSchemaEntityWithExtInfo() {
+        AtlasEntity entity = new 
AtlasEntity(KafkaDataTypes.AVRO_SCHEMA.getName());
+        entity.setGuid(TEST_GUID);
+        entity.setAttribute("qualifiedName", TEST_SCHEMA_NAME + "@v1@" + 
TEST_NAMESPACE);
+        return new AtlasEntity.AtlasEntityWithExtInfo(entity);
+    }
+
+    private void setupSchemaEntityCreation() throws Exception {
+        
when(mockAtlasClient.getEntityByAttribute(eq(KafkaDataTypes.AVRO_SCHEMA.getName()),
 any(Map.class))).thenReturn(null);
+
+        EntityMutationResponse mockResponse = 
mock(EntityMutationResponse.class);
+        AtlasEntityHeader mockHeader = mock(AtlasEntityHeader.class);
+        when(mockHeader.getGuid()).thenReturn(TEST_GUID);
+        
when(mockResponse.getCreatedEntities()).thenReturn(Collections.singletonList(mockHeader));
+        
when(mockAtlasClient.createEntity(any(AtlasEntity.AtlasEntityWithExtInfo.class))).thenReturn(mockResponse);
+        
when(mockAtlasClient.getEntityByGuid(TEST_GUID)).thenReturn(createMockSchemaEntityWithExtInfo());
+    }
+
+    private void setupKafkaBridge() throws Exception {
+        setupMocks();
+        when(mockConfiguration.getString("atlas.cluster.name", 
"primary")).thenReturn(TEST_CLUSTER_NAME);
+        
when(mockConfiguration.getStringArray("atlas.metadata.namespace")).thenReturn(TEST_NAMESPACE_ARRAY);
+        
when(mockKafkaUtils.listAllTopics()).thenReturn(Arrays.asList(TEST_TOPIC_NAME));
+
+        kafkaBridge = new KafkaBridge(mockConfiguration, mockAtlasClient, 
mockKafkaUtils);
+    }
+
+    private AtlasEntity.AtlasEntityWithExtInfo createMockEntityWithExtInfo() {
+        AtlasEntity entity = new 
AtlasEntity(KafkaDataTypes.KAFKA_TOPIC.getName());
+        entity.setGuid(TEST_GUID);
+        entity.setAttribute("qualifiedName", TEST_QUALIFIED_NAME);
+        return new AtlasEntity.AtlasEntityWithExtInfo(entity);
+    }
+
+    @Test
+    public void testPrintUsageMethod() throws Exception {
+        // Setup
+        setupKafkaBridge();
+
+        // Use reflection to test private static method
+        java.lang.reflect.Method method = 
KafkaBridge.class.getDeclaredMethod("printUsage");
+        method.setAccessible(true);
+
+        // Execute - should not throw exception
+        method.invoke(null);
+    }
+
+    @Test
+    public void testClearRelationshipAttributesWithEntity() throws Exception {
+        // Setup
+        setupKafkaBridge();
+        AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = 
createMockEntityWithExtInfo();
+        entityWithExtInfo.getEntity().setRelationshipAttribute("test", 
"value");
+
+        // Use reflection to test private method
+        java.lang.reflect.Method method = 
KafkaBridge.class.getDeclaredMethod("clearRelationshipAttributes", 
AtlasEntity.AtlasEntityWithExtInfo.class);
+        method.setAccessible(true);
+
+        // Execute
+        method.invoke(kafkaBridge, entityWithExtInfo);
+
+        // Verify - should not throw exception
+        assertNotNull(entityWithExtInfo);
+    }
+
+    @Test
+    public void testClearRelationshipAttributesWithNullEntity() throws 
Exception {
+        // Setup
+        setupKafkaBridge();
+
+        // Use reflection to test private method
+        java.lang.reflect.Method method = 
KafkaBridge.class.getDeclaredMethod("clearRelationshipAttributes", 
AtlasEntity.AtlasEntityWithExtInfo.class);
+        method.setAccessible(true);
+
+        // Execute - should not throw exception with null input
+        method.invoke(kafkaBridge, (AtlasEntity.AtlasEntityWithExtInfo) null);
+    }
+
+    @Test
+    public void testClearRelationshipAttributesWithCollection() throws 
Exception {
+        // Setup
+        setupKafkaBridge();
+        List<AtlasEntity> entities = new ArrayList<>();
+        AtlasEntity entity = new 
AtlasEntity(KafkaDataTypes.KAFKA_TOPIC.getName());
+        entity.setRelationshipAttribute("test", "value");
+        entities.add(entity);
+
+        // Use reflection to test private method
+        java.lang.reflect.Method method = 
KafkaBridge.class.getDeclaredMethod("clearRelationshipAttributes", 
Collection.class);
+        method.setAccessible(true);
+
+        // Execute
+        method.invoke(kafkaBridge, entities);
+
+        // Verify - should not throw exception
+        assertNotNull(entities);
+    }
+
+    @Test
+    public void testFindOrCreateAtlasSchemaWithMultipleVersions() throws 
Exception {
+        // Setup
+        setupKafkaBridge();
+
+        // Mock HTTP responses for multiple schema versions
+        when(mockHttpResponse.getStatusLine()).thenReturn(mockStatusLine);
+        when(mockStatusLine.getStatusCode()).thenReturn(HttpStatus.SC_OK);
+        when(mockHttpResponse.getEntity()).thenReturn(mockHttpEntity);
+        when(mockHttpEntity.getContent()).thenReturn(new 
ByteArrayInputStream("[1,2,3]".getBytes(StandardCharsets.UTF_8)));
+        when(mockHttpClient.execute(any())).thenReturn(mockHttpResponse);
+
+        // Mock schema content responses
+        when(mockHttpEntity.getContent())
+                .thenReturn(new 
ByteArrayInputStream("[1,2,3]".getBytes(StandardCharsets.UTF_8)))
+                .thenReturn(new 
ByteArrayInputStream(TEST_SCHEMA.getBytes(StandardCharsets.UTF_8)))
+                .thenReturn(new 
ByteArrayInputStream(TEST_SCHEMA.getBytes(StandardCharsets.UTF_8)))
+                .thenReturn(new 
ByteArrayInputStream(TEST_SCHEMA.getBytes(StandardCharsets.UTF_8)));
+
+        // Use reflection to access and replace httpClient field
+        java.lang.reflect.Field httpClientField = 
KafkaBridge.class.getDeclaredField("httpClient");
+        httpClientField.setAccessible(true);
+        httpClientField.set(kafkaBridge, mockHttpClient);
+
+        // Mock Atlas client responses
+        when(mockAtlasClient.getEntityByAttribute(anyString(), 
any(Map.class))).thenReturn(null);
+        setupSchemaEntityCreation();
+
+        // Use reflection to test private method
+        java.lang.reflect.Method method = 
KafkaBridge.class.getDeclaredMethod("findOrCreateAtlasSchema", String.class);
+        method.setAccessible(true);
+
+        // Execute
+        try {
+            List<AtlasEntity> result = (List<AtlasEntity>) 
method.invoke(kafkaBridge, TEST_TOPIC_NAME);
+            assertNotNull(result);
+        } catch (Exception e) {
+            // Expected due to complex schema registry mocking
+            assertTrue(e.getCause() instanceof RuntimeException || 
e.getCause() instanceof IOException);
+        }
+    }
+
+    @Test
+    public void testFindOrCreateAtlasSchemaWithExistingAtlasEntity() throws 
Exception {
+        // Setup
+        setupKafkaBridge();
+
+        // Mock HTTP responses
+        when(mockHttpResponse.getStatusLine()).thenReturn(mockStatusLine);
+        when(mockStatusLine.getStatusCode()).thenReturn(HttpStatus.SC_OK);
+        when(mockHttpResponse.getEntity()).thenReturn(mockHttpEntity);
+        when(mockHttpEntity.getContent()).thenReturn(new 
ByteArrayInputStream("[1]".getBytes(StandardCharsets.UTF_8)));
+        when(mockHttpClient.execute(any())).thenReturn(mockHttpResponse);
+
+        // Use reflection to access and replace httpClient field
+        java.lang.reflect.Field httpClientField = 
KafkaBridge.class.getDeclaredField("httpClient");
+        httpClientField.setAccessible(true);
+        httpClientField.set(kafkaBridge, mockHttpClient);
+
+        // Mock Atlas client to return existing entity
+        AtlasEntity.AtlasEntityWithExtInfo existingEntity = 
createMockSchemaEntityWithExtInfo();
+        when(mockAtlasClient.getEntityByAttribute(anyString(), 
any(Map.class))).thenReturn(existingEntity);
+        setupSchemaEntityCreation();
+
+        // Use reflection to test private method
+        java.lang.reflect.Method method = 
KafkaBridge.class.getDeclaredMethod("findOrCreateAtlasSchema", String.class);
+        method.setAccessible(true);
+
+        // Execute
+        try {
+            List<AtlasEntity> result = (List<AtlasEntity>) 
method.invoke(kafkaBridge, TEST_TOPIC_NAME);
+            assertNotNull(result);
+        } catch (Exception e) {
+            // Expected due to complex schema registry mocking
+            assertTrue(e.getCause() instanceof RuntimeException || 
e.getCause() instanceof IOException);
+        }
+    }
+
+    @Test
+    public void testFindOrCreateAtlasSchemaWithNullSchemaContent() throws 
Exception {
+        setupKafkaBridge();
+
+        // Mock HTTP responses with null schema content
+        when(mockHttpResponse.getStatusLine()).thenReturn(mockStatusLine);
+        when(mockStatusLine.getStatusCode()).thenReturn(HttpStatus.SC_OK);
+        when(mockHttpResponse.getEntity()).thenReturn(mockHttpEntity);
+        when(mockHttpEntity.getContent())
+                .thenReturn(new 
ByteArrayInputStream("[1]".getBytes(StandardCharsets.UTF_8)))
+                .thenReturn(new 
ByteArrayInputStream("null".getBytes(StandardCharsets.UTF_8)));
+        when(mockHttpClient.execute(any())).thenReturn(mockHttpResponse);
+
+        // Use reflection to access and replace httpClient field
+        java.lang.reflect.Field httpClientField = 
KafkaBridge.class.getDeclaredField("httpClient");
+        httpClientField.setAccessible(true);
+        httpClientField.set(kafkaBridge, mockHttpClient);
+
+        // Use reflection to test private method
+        java.lang.reflect.Method method = 
KafkaBridge.class.getDeclaredMethod("findOrCreateAtlasSchema", String.class);
+        method.setAccessible(true);
+
+        // Execute
+        try {
+            List<AtlasEntity> result = (List<AtlasEntity>) 
method.invoke(kafkaBridge, TEST_TOPIC_NAME);
+            assertNotNull(result);
+            // Should be empty when schema content is null
+            assertTrue(result.isEmpty());
+        } catch (Exception e) {
+            // Expected due to schema registry connectivity issues
+            assertTrue(e.getCause() instanceof RuntimeException || 
e.getCause() instanceof IOException);
+        }
+    }
+
+    @Test
+    public void testFindOrCreateAtlasSchemaWithSchemaRegistryError() throws 
Exception {
+        setupKafkaBridge();
+
+        when(mockHttpResponse.getStatusLine()).thenReturn(mockStatusLine);
+        
when(mockStatusLine.getStatusCode()).thenReturn(HttpStatus.SC_NOT_FOUND);
+        when(mockHttpClient.execute(any())).thenReturn(mockHttpResponse);
+
+        java.lang.reflect.Field httpClientField = 
KafkaBridge.class.getDeclaredField("httpClient");
+        httpClientField.setAccessible(true);
+        httpClientField.set(kafkaBridge, mockHttpClient);
+
+        // Use reflection to test private method
+        java.lang.reflect.Method method = 
KafkaBridge.class.getDeclaredMethod("findOrCreateAtlasSchema", String.class);
+        method.setAccessible(true);
+
+        try {
+            List<AtlasEntity> result = (List<AtlasEntity>) 
method.invoke(kafkaBridge, TEST_TOPIC_NAME);
+            assertNotNull(result);
+            assertTrue(result.isEmpty());
+        } catch (Exception e) {
+            assertTrue(e.getCause() instanceof RuntimeException || 
e.getCause() instanceof IOException);
+        }
+    }
+
+    @Test
+    public void testImportTopicRegexFilters() throws Exception {
+        when(mockConfiguration.getString("atlas.cluster.name", 
"primary")).thenReturn(TEST_CLUSTER_NAME);
+        
when(mockConfiguration.getStringArray("atlas.metadata.namespace")).thenReturn(TEST_NAMESPACE_ARRAY);
+        
when(mockKafkaUtils.listAllTopics()).thenReturn(Arrays.asList("payments", 
"orders", "inventory"));
+        KafkaBridge bridge = new KafkaBridge(mockConfiguration, 
mockAtlasClient, mockKafkaUtils);
+        KafkaBridge spyBridge = spy(bridge);
+        AtlasEntity.AtlasEntityWithExtInfo dummy = new 
AtlasEntity.AtlasEntityWithExtInfo(new 
AtlasEntity(KafkaDataTypes.KAFKA_TOPIC.getName()));
+        doReturn(dummy).when(spyBridge).createOrUpdateTopic(anyString());
+        spyBridge.importTopic("orders|inven.*");
+        verify(spyBridge).createOrUpdateTopic("orders");
+        verify(spyBridge).createOrUpdateTopic("inventory");
+        verify(spyBridge, never()).createOrUpdateTopic("payments");
+    }
+
+    @Test(expectedExceptions = Exception.class)
+    public void testGetTopicEntity_partitionErrorThrows() throws Exception {
+        when(mockConfiguration.getString("atlas.cluster.name", 
"primary")).thenReturn(TEST_CLUSTER_NAME);
+        
when(mockConfiguration.getStringArray("atlas.metadata.namespace")).thenReturn(TEST_NAMESPACE_ARRAY);
+        
when(mockKafkaUtils.listAllTopics()).thenReturn(Collections.singletonList(TEST_TOPIC_NAME));
+        when(mockKafkaUtils.getPartitionCount(TEST_TOPIC_NAME)).thenThrow(new 
java.util.concurrent.ExecutionException("fail", new RuntimeException("boom")));
+        KafkaBridge bridge = new KafkaBridge(mockConfiguration, 
mockAtlasClient, mockKafkaUtils);
+        bridge.getTopicEntity(TEST_TOPIC_NAME, null);
+    }
+
+    @Test
+    public void testGetSchemaEntity_namespaceFallback() throws Exception {
+        when(mockConfiguration.getString("atlas.cluster.name", 
"primary")).thenReturn(TEST_CLUSTER_NAME);
+        
when(mockConfiguration.getStringArray("atlas.metadata.namespace")).thenReturn(TEST_NAMESPACE_ARRAY);
+        
when(mockKafkaUtils.listAllTopics()).thenReturn(Collections.singletonList(TEST_TOPIC_NAME));
+        // Schema without namespace triggers fallback to key constant when 
input namespace is null
+        String schemaNoNs = 
"{\"type\":\"record\",\"name\":\"Rec\",\"fields\":[{\"name\":\"a\",\"type\":\"string\"}]}";
+        KafkaBridge bridge = new KafkaBridge(mockConfiguration, 
mockAtlasClient, mockKafkaUtils);
+        KafkaBridge spyBridge = spy(bridge);
+        doReturn(Collections.emptyList())
+                .when(spyBridge)
+                .createNestedFields(any(Schema.class), anyString(), 
anyString(), any(int.class), anyString());
+        AtlasEntity entity = spyBridge.getSchemaEntity(schemaNoNs, 
TEST_SCHEMA_NAME, null, 1, null);
+        assertEquals(entity.getAttribute("namespace"), 
"atlas.metadata.namespace");
+    }
+
+    @Test
+    public void testCreateNestedFields_arrayAndNestedRecord() throws Exception 
{
+        when(mockConfiguration.getString("atlas.cluster.name", 
"primary")).thenReturn(TEST_CLUSTER_NAME);
+        
when(mockConfiguration.getStringArray("atlas.metadata.namespace")).thenReturn(TEST_NAMESPACE_ARRAY);
+        
when(mockKafkaUtils.listAllTopics()).thenReturn(Collections.singletonList(TEST_TOPIC_NAME));
+        // Avro schema with array and nested record to cover both branches
+        String complexSchema = 
"{\"type\":\"record\",\"name\":\"Top\",\"fields\":["
+                + 
"{\"name\":\"items\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"Sub\",\"fields\":[{\"name\":\"x\",\"type\":\"string\"}]}}},"
+                + 
"{\"name\":\"inner\",\"type\":{\"type\":\"record\",\"name\":\"Inner\",\"fields\":[{\"name\":\"y\",\"type\":\"int\"}]}}]}";
+        KafkaBridge bridge = new KafkaBridge(mockConfiguration, 
mockAtlasClient, mockKafkaUtils);
+        KafkaBridge spyBridge = spy(bridge);
+        // Stub createOrUpdateField to avoid Atlas interactions and return 
simple entities
+        doReturn(new AtlasEntity.AtlasEntityWithExtInfo(new 
AtlasEntity(KafkaDataTypes.AVRO_FIELD.getName())))
+                .when(spyBridge).createOrUpdateField(any(Schema.Field.class), 
anyString(), anyString(), any(int.class), anyString());
+        List<AtlasEntity> fields = spyBridge.createNestedFields(new 
Schema.Parser().parse(complexSchema), TEST_SCHEMA_NAME, TEST_NAMESPACE, 1, "");
+        assertNotNull(fields);
+        // Expect 2 leaf fields (x and y)
+        assertEquals(fields.size(), 2);
+    }
+
+    @Test
+    public void testCreateEntityInAtlas_noCreatedEntities() throws Exception {
+        EntityMutationResponse mockResponse = 
mock(EntityMutationResponse.class);
+        
when(mockResponse.getCreatedEntities()).thenReturn(Collections.emptyList());
+        
when(mockAtlasClient.createEntity(any(AtlasEntity.AtlasEntityWithExtInfo.class))).thenReturn(mockResponse);
+        when(mockConfiguration.getString("atlas.cluster.name", 
"primary")).thenReturn(TEST_CLUSTER_NAME);
+        
when(mockConfiguration.getStringArray("atlas.metadata.namespace")).thenReturn(TEST_NAMESPACE_ARRAY);
+        
when(mockKafkaUtils.listAllTopics()).thenReturn(Collections.singletonList(TEST_TOPIC_NAME));
+        KafkaBridge bridge = new KafkaBridge(mockConfiguration, 
mockAtlasClient, mockKafkaUtils);
+        AtlasEntity.AtlasEntityWithExtInfo input = new 
AtlasEntity.AtlasEntityWithExtInfo(new 
AtlasEntity(KafkaDataTypes.KAFKA_TOPIC.getName()));
+        AtlasEntity.AtlasEntityWithExtInfo ret = 
bridge.createEntityInAtlas(input);
+        assertEquals(ret, null);
+    }
+
+    @Test
+    public void testUpdateEntityInAtlas_nullResponseReturnsInput() throws 
Exception {
+        
when(mockAtlasClient.updateEntity(any(AtlasEntity.AtlasEntityWithExtInfo.class))).thenReturn(null);
+        when(mockConfiguration.getString("atlas.cluster.name", 
"primary")).thenReturn(TEST_CLUSTER_NAME);
+        
when(mockConfiguration.getStringArray("atlas.metadata.namespace")).thenReturn(TEST_NAMESPACE_ARRAY);
+        
when(mockKafkaUtils.listAllTopics()).thenReturn(Collections.singletonList(TEST_TOPIC_NAME));
+        KafkaBridge bridge = new KafkaBridge(mockConfiguration, 
mockAtlasClient, mockKafkaUtils);
+        AtlasEntity.AtlasEntityWithExtInfo input = new 
AtlasEntity.AtlasEntityWithExtInfo(new 
AtlasEntity(KafkaDataTypes.KAFKA_TOPIC.getName()));
+        AtlasEntity.AtlasEntityWithExtInfo ret = 
bridge.updateEntityInAtlas(input);
+        assertEquals(ret, input);
+    }
+
+    @Test
+    public void testUpdateEntityInAtlas_emptyUpdatedEntitiesReturnsInput() 
throws Exception {
+        EntityMutationResponse mockResponse = 
mock(EntityMutationResponse.class);
+        
when(mockResponse.getUpdatedEntities()).thenReturn(Collections.emptyList());
+        
when(mockAtlasClient.updateEntity(any(AtlasEntity.AtlasEntityWithExtInfo.class))).thenReturn(mockResponse);
+        when(mockConfiguration.getString("atlas.cluster.name", 
"primary")).thenReturn(TEST_CLUSTER_NAME);
+        
when(mockConfiguration.getStringArray("atlas.metadata.namespace")).thenReturn(TEST_NAMESPACE_ARRAY);
+        
when(mockKafkaUtils.listAllTopics()).thenReturn(Collections.singletonList(TEST_TOPIC_NAME));
+        KafkaBridge bridge = new KafkaBridge(mockConfiguration, 
mockAtlasClient, mockKafkaUtils);
+        AtlasEntity.AtlasEntityWithExtInfo input = new 
AtlasEntity.AtlasEntityWithExtInfo(new 
AtlasEntity(KafkaDataTypes.KAFKA_TOPIC.getName()));
+        AtlasEntity.AtlasEntityWithExtInfo ret = 
bridge.updateEntityInAtlas(input);
+        assertEquals(ret, input);
+    }
+
+    @Test
+    public void testQualifiedNameFormats() {
+        assertEquals(KafkaBridge.getTopicQualifiedName("ns", "TopicA"), 
"topica@ns");
+        assertEquals(KafkaBridge.getSchemaQualifiedName("ns", "name-value", 
"v1"), "name-value@v1@ns");
+        assertEquals(KafkaBridge.getFieldQualifiedName("ns", "A.B", 
"name-value", "v1"), "a.b@name-value@v1@ns");
+    }
+
+    @Test
+    public void testFindEntityInAtlas_exceptionHandled() throws Exception {
+        when(mockConfiguration.getString("atlas.cluster.name", 
"primary")).thenReturn(TEST_CLUSTER_NAME);
+        
when(mockConfiguration.getStringArray("atlas.metadata.namespace")).thenReturn(TEST_NAMESPACE_ARRAY);
+        
when(mockKafkaUtils.listAllTopics()).thenReturn(Collections.singletonList(TEST_TOPIC_NAME));
+        when(mockAtlasClient.getEntityByAttribute(anyString(), 
any(Map.class))).thenThrow(new RuntimeException("fail"));
+        KafkaBridge bridge = new KafkaBridge(mockConfiguration, 
mockAtlasClient, mockKafkaUtils);
+        AtlasEntity.AtlasEntityWithExtInfo ret = 
bridge.findEntityInAtlas("type", "qn");
+        assertEquals(ret, null);
+    }
 }
diff --git 
a/addons/kafka-bridge/src/test/java/org/apache/atlas/kafka/bridge/SchemaRegistryConnectorTest.java
 
b/addons/kafka-bridge/src/test/java/org/apache/atlas/kafka/bridge/SchemaRegistryConnectorTest.java
new file mode 100644
index 000000000..1e120d617
--- /dev/null
+++ 
b/addons/kafka-bridge/src/test/java/org/apache/atlas/kafka/bridge/SchemaRegistryConnectorTest.java
@@ -0,0 +1,303 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.kafka.bridge;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpStatus;
+import org.apache.http.StatusLine;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+
+public class SchemaRegistryConnectorTest {
+    @Mock
+    private CloseableHttpClient mockHttpClient;
+
+    @Mock
+    private CloseableHttpResponse mockHttpResponse;
+
+    @Mock
+    private StatusLine mockStatusLine;
+
+    @Mock
+    private HttpEntity mockHttpEntity;
+
+    private static final String TEST_SUBJECT = "test-subject";
+    private static final int TEST_VERSION = 1;
+    private static final String TEST_HOSTNAME = "test-registry-host:8081";
+
+    @BeforeMethod
+    public void setUp() throws Exception {
+        MockitoAnnotations.initMocks(this);
+        setKafkaSchemaRegistryHostname(TEST_HOSTNAME);
+        when(mockHttpResponse.getStatusLine()).thenReturn(mockStatusLine);
+        when(mockHttpResponse.getEntity()).thenReturn(mockHttpEntity);
+    }
+
+    private void setKafkaSchemaRegistryHostname(String hostname) throws 
Exception {
+        Field field = 
KafkaBridge.class.getDeclaredField("kafkaSchemaRegistryHostname");
+        field.setAccessible(true);
+        field.set(null, hostname);
+    }
+
+    @Test
+    public void testGetVersionsKafkaSchemaRegistry_Success() throws 
IOException {
+        // Arrange
+        String jsonResponse = "[1, 2, 3]";
+        ByteArrayInputStream inputStream = new 
ByteArrayInputStream(jsonResponse.getBytes(StandardCharsets.UTF_8));
+
+        
when(mockHttpClient.execute(any(HttpGet.class))).thenReturn(mockHttpResponse);
+        when(mockStatusLine.getStatusCode()).thenReturn(HttpStatus.SC_OK);
+        when(mockHttpEntity.getContent()).thenReturn(inputStream);
+
+        ArrayList<Integer> result = 
SchemaRegistryConnector.getVersionsKafkaSchemaRegistry(mockHttpClient, 
TEST_SUBJECT);
+
+        assertNotNull(result);
+        assertEquals(result.size(), 3);
+        assertEquals(result.get(0), Integer.valueOf(1));
+        assertEquals(result.get(1), Integer.valueOf(2));
+        assertEquals(result.get(2), Integer.valueOf(3));
+
+        verify(mockHttpClient).execute(any(HttpGet.class));
+        verify(mockHttpResponse).close();
+    }
+
+    @Test
+    public void testGetVersionsKafkaSchemaRegistry_NotFound() throws 
IOException {
+        // Arrange
+        
when(mockHttpClient.execute(any(HttpGet.class))).thenReturn(mockHttpResponse);
+        
when(mockStatusLine.getStatusCode()).thenReturn(HttpStatus.SC_NOT_FOUND);
+
+        // Act
+        ArrayList<Integer> result = 
SchemaRegistryConnector.getVersionsKafkaSchemaRegistry(mockHttpClient, 
TEST_SUBJECT);
+
+        // Assert
+        assertNotNull(result);
+        assertTrue(result.isEmpty());
+
+        verify(mockHttpClient).execute(any(HttpGet.class));
+        verify(mockHttpResponse).close();
+    }
+
+    @Test
+    public void testGetVersionsKafkaSchemaRegistry_ConnectionError() throws 
IOException {
+        // Arrange
+        
when(mockHttpClient.execute(any(HttpGet.class))).thenReturn(mockHttpResponse);
+        
when(mockStatusLine.getStatusCode()).thenReturn(HttpStatus.SC_INTERNAL_SERVER_ERROR);
+
+        // Act
+        ArrayList<Integer> result = 
SchemaRegistryConnector.getVersionsKafkaSchemaRegistry(mockHttpClient, 
TEST_SUBJECT);
+
+        // Assert
+        assertNotNull(result);
+        assertTrue(result.isEmpty());
+
+        verify(mockHttpClient).execute(any(HttpGet.class));
+        verify(mockHttpResponse).close();
+    }
+
+    @Test
+    public void testGetVersionsKafkaSchemaRegistry_ParseException() throws 
IOException {
+        // Arrange
+        String invalidJsonResponse = "invalid json";
+        ByteArrayInputStream inputStream = new 
ByteArrayInputStream(invalidJsonResponse.getBytes(StandardCharsets.UTF_8));
+
+        
when(mockHttpClient.execute(any(HttpGet.class))).thenReturn(mockHttpResponse);
+        when(mockStatusLine.getStatusCode()).thenReturn(HttpStatus.SC_OK);
+        when(mockHttpEntity.getContent()).thenReturn(inputStream);
+
+        // Act
+        ArrayList<Integer> result = 
SchemaRegistryConnector.getVersionsKafkaSchemaRegistry(mockHttpClient, 
TEST_SUBJECT);
+
+        // Assert
+        assertNotNull(result);
+        assertTrue(result.isEmpty());
+
+        verify(mockHttpClient).execute(any(HttpGet.class));
+        verify(mockHttpResponse).close();
+    }
+
+    @Test
+    public void testGetVersionsKafkaSchemaRegistry_IOException() throws 
IOException {
+        // Arrange
+        when(mockHttpClient.execute(any(HttpGet.class))).thenThrow(new 
IOException("Connection failed"));
+
+        // Act
+        ArrayList<Integer> result = 
SchemaRegistryConnector.getVersionsKafkaSchemaRegistry(mockHttpClient, 
TEST_SUBJECT);
+
+        // Assert
+        assertNotNull(result);
+        assertTrue(result.isEmpty());
+
+        verify(mockHttpClient).execute(any(HttpGet.class));
+    }
+
+    @Test
+    public void testGetVersionsKafkaSchemaRegistry_LargeVersionsList() throws 
IOException {
+        // Arrange
+        String jsonResponse = "[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]";
+        ByteArrayInputStream inputStream = new 
ByteArrayInputStream(jsonResponse.getBytes(StandardCharsets.UTF_8));
+
+        
when(mockHttpClient.execute(any(HttpGet.class))).thenReturn(mockHttpResponse);
+        when(mockStatusLine.getStatusCode()).thenReturn(HttpStatus.SC_OK);
+        when(mockHttpEntity.getContent()).thenReturn(inputStream);
+
+        // Act
+        ArrayList<Integer> result = 
SchemaRegistryConnector.getVersionsKafkaSchemaRegistry(mockHttpClient, 
TEST_SUBJECT);
+
+        // Assert
+        assertNotNull(result);
+        assertEquals(result.size(), 10);
+        assertEquals(result.get(0), Integer.valueOf(1));
+        assertEquals(result.get(9), Integer.valueOf(10));
+
+        verify(mockHttpClient).execute(any(HttpGet.class));
+        verify(mockHttpResponse).close();
+    }
+
+    @Test
+    public void testGetVersionsKafkaSchemaRegistry_EmptyVersionsList() throws 
IOException {
+        // Arrange
+        String jsonResponse = "[]";
+        ByteArrayInputStream inputStream = new 
ByteArrayInputStream(jsonResponse.getBytes(StandardCharsets.UTF_8));
+
+        
when(mockHttpClient.execute(any(HttpGet.class))).thenReturn(mockHttpResponse);
+        when(mockStatusLine.getStatusCode()).thenReturn(HttpStatus.SC_OK);
+        when(mockHttpEntity.getContent()).thenReturn(inputStream);
+
+        // Act
+        ArrayList<Integer> result = 
SchemaRegistryConnector.getVersionsKafkaSchemaRegistry(mockHttpClient, 
TEST_SUBJECT);
+
+        // Assert
+        assertNotNull(result);
+        assertTrue(result.isEmpty());
+
+        verify(mockHttpClient).execute(any(HttpGet.class));
+        verify(mockHttpResponse).close();
+    }
+
+    @Test
+    public void testGetSchemaFromKafkaSchemaRegistry_Success() throws 
IOException {
+        String expectedSchema = 
"{\"type\":\"record\",\"name\":\"TestRecord\"}";
+        String arrangedExpectedSchema = expectedSchema.replace("\"", "\\\"");
+        String jsonResponse = "{\"schema\":\"" + arrangedExpectedSchema + 
"\"}";
+        ByteArrayInputStream inputStream = new 
ByteArrayInputStream(jsonResponse.getBytes(StandardCharsets.UTF_8));
+
+        
when(mockHttpClient.execute(any(HttpGet.class))).thenReturn(mockHttpResponse);
+        when(mockStatusLine.getStatusCode()).thenReturn(HttpStatus.SC_OK);
+        when(mockHttpEntity.getContent()).thenReturn(inputStream);
+
+        String result = 
SchemaRegistryConnector.getSchemaFromKafkaSchemaRegistry(mockHttpClient, 
TEST_SUBJECT, TEST_VERSION);
+
+        assertNotNull(result);
+        assertEquals(result, expectedSchema);
+
+        verify(mockHttpClient).execute(any(HttpGet.class));
+        verify(mockHttpResponse).close();
+    }
+
+    @Test
+    public void testGetSchemaFromKafkaSchemaRegistry_NotFound() throws 
IOException {
+        // Arrange
+        
when(mockHttpClient.execute(any(HttpGet.class))).thenReturn(mockHttpResponse);
+        
when(mockStatusLine.getStatusCode()).thenReturn(HttpStatus.SC_NOT_FOUND);
+
+        String result = 
SchemaRegistryConnector.getSchemaFromKafkaSchemaRegistry(mockHttpClient, 
TEST_SUBJECT, TEST_VERSION);
+
+        assertNull(result);
+
+        verify(mockHttpClient).execute(any(HttpGet.class));
+        verify(mockHttpResponse).close();
+    }
+
+    @Test
+    public void testGetSchemaFromKafkaSchemaRegistry_ConnectionError() throws 
IOException {
+        // Arrange
+        
when(mockHttpClient.execute(any(HttpGet.class))).thenReturn(mockHttpResponse);
+        
when(mockStatusLine.getStatusCode()).thenReturn(HttpStatus.SC_INTERNAL_SERVER_ERROR);
+
+        String result = 
SchemaRegistryConnector.getSchemaFromKafkaSchemaRegistry(mockHttpClient, 
TEST_SUBJECT, TEST_VERSION);
+
+        assertNull(result);
+
+        verify(mockHttpClient).execute(any(HttpGet.class));
+        verify(mockHttpResponse).close();
+    }
+
+    @Test
+    public void testGetSchemaFromKafkaSchemaRegistry_ParseException() throws 
IOException {
+        String invalidJsonResponse = "invalid json";
+        ByteArrayInputStream inputStream = new 
ByteArrayInputStream(invalidJsonResponse.getBytes(StandardCharsets.UTF_8));
+
+        
when(mockHttpClient.execute(any(HttpGet.class))).thenReturn(mockHttpResponse);
+        when(mockStatusLine.getStatusCode()).thenReturn(HttpStatus.SC_OK);
+        when(mockHttpEntity.getContent()).thenReturn(inputStream);
+
+        String result = 
SchemaRegistryConnector.getSchemaFromKafkaSchemaRegistry(mockHttpClient, 
TEST_SUBJECT, TEST_VERSION);
+
+        assertNull(result);
+
+        verify(mockHttpClient).execute(any(HttpGet.class));
+        verify(mockHttpResponse).close();
+    }
+
+    @Test
+    public void testGetVersionsKafkaSchemaRegistry_URLConstruction() throws 
IOException {
+        
when(mockHttpClient.execute(any(HttpGet.class))).thenReturn(mockHttpResponse);
+        
when(mockStatusLine.getStatusCode()).thenReturn(HttpStatus.SC_NOT_FOUND);
+        ArrayList<Integer> result = 
SchemaRegistryConnector.getVersionsKafkaSchemaRegistry(mockHttpClient, 
TEST_SUBJECT);
+        assertNotNull(result);
+        ArgumentCaptor<HttpGet> captor = 
ArgumentCaptor.forClass(HttpGet.class);
+        verify(mockHttpClient).execute(captor.capture());
+        String url = captor.getValue().getURI().toString();
+        assertTrue(url.contains("http://"; + TEST_HOSTNAME + "/subjects/" + 
TEST_SUBJECT + "/versions/"));
+        verify(mockHttpResponse).close();
+    }
+
+    @Test
+    public void testGetSchemaFromKafkaSchemaRegistry_URLConstruction() throws 
IOException {
+        
when(mockHttpClient.execute(any(HttpGet.class))).thenReturn(mockHttpResponse);
+        
when(mockStatusLine.getStatusCode()).thenReturn(HttpStatus.SC_NOT_FOUND);
+        String result = 
SchemaRegistryConnector.getSchemaFromKafkaSchemaRegistry(mockHttpClient, 
TEST_SUBJECT, TEST_VERSION);
+        assertNull(result);
+        ArgumentCaptor<HttpGet> captor = 
ArgumentCaptor.forClass(HttpGet.class);
+        verify(mockHttpClient).execute(captor.capture());
+        String url = captor.getValue().getURI().toString();
+        assertTrue(url.contains("http://"; + TEST_HOSTNAME + "/subjects/" + 
TEST_SUBJECT + "/versions/" + TEST_VERSION));
+        verify(mockHttpResponse).close();
+    }
+}


Reply via email to