This is an automated email from the ASF dual-hosted git repository.

abhishekrb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 35b970935f9 Better error handling when retrieving Avro schemas from 
registry (#16684)
35b970935f9 is described below

commit 35b970935f92c16f726df99806043b298260d112
Author: Abhishek Radhakrishnan <[email protected]>
AuthorDate: Tue Jul 2 16:48:34 2024 -0700

    Better error handling when retrieving Avro schemas from registry (#16684)
    
    * Handle RestClientException separately, instead of returning a generic 
error.
    
    - Add tests
    - Clean up the tests; remove the legacy expected exception pattern
    - Better test assertions
    
    * Rename tests
    
    * checkstyle fixes
---
 .../avro/SchemaRegistryBasedAvroBytesDecoder.java  |  33 ++++-
 .../SchemaRegistryBasedAvroBytesDecoderTest.java   | 152 ++++++++++++++++-----
 2 files changed, 147 insertions(+), 38 deletions(-)

diff --git 
a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java
 
b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java
index 167b036e7d1..c7c7438b1cd 100644
--- 
a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java
+++ 
b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java
@@ -139,11 +139,34 @@ public class SchemaRegistryBasedAvroBytesDecoder 
implements AvroBytesDecoder
       ParsedSchema parsedSchema = registry.getSchemaById(id);
       schema = parsedSchema instanceof AvroSchema ? ((AvroSchema) 
parsedSchema).rawSchema() : null;
     }
-    catch (IOException | RestClientException ex) {
-      throw new ParseException(null, ex, "Failed to fetch Avro schema id[%s] 
from registry. Check if the schema "
-                                         + "exists in the registry. Otherwise 
it could mean that there is "
-                                         + "malformed data in the stream or 
data that doesn't conform to the schema "
-                                         + "specified.", id);
+    catch (IOException ex1) {
+      throw new ParseException(
+          null,
+          ex1,
+          "Failed to fetch Avro schema id[%s] from registry. Check if the 
schema exists in the registry. Otherwise it"
+          + " could mean that there is malformed data in the stream or data 
that doesn't conform to the schema"
+          + " specified.",
+          id
+      );
+    }
+    catch (RestClientException ex2) {
+      if (ex2.getErrorCode() == 401) {
+        throw new ParseException(
+            null,
+            ex2,
+            "Failed to authenticate to schema registry for Avro schema id[%s]. 
Please check your credentials.",
+            id
+        );
+      }
+      // For all other errors, just include the code and message received from 
the library.
+      throw new ParseException(
+          null,
+          ex2,
+          "Failed to fetch Avro schema id[%s] from registry. Error code[%s] 
and message[%s].",
+          id,
+          ex2.getErrorCode(),
+          ex2.getMessage()
+      );
     }
     if (schema == null) {
       throw new ParseException(null, "No Avro schema id[%s] in registry", id);
diff --git 
a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java
 
b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java
index 7644b61bb10..2aad88e8ff5 100644
--- 
a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java
+++ 
b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import io.confluent.kafka.schemaregistry.ParsedSchema;
 import io.confluent.kafka.schemaregistry.avro.AvroSchema;
 import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import 
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumWriter;
@@ -61,15 +62,15 @@ public class SchemaRegistryBasedAvroBytesDecoderTest
   @Test
   public void testMultipleUrls() throws Exception
   {
+    // Given
     String json = "{\"urls\":[\"http://localhost\"],\"type\": 
\"schema_registry\"}";
     ObjectMapper mapper = new DefaultObjectMapper();
     mapper.setInjectableValues(
         new InjectableValues.Std().addValue(ObjectMapper.class, new 
DefaultObjectMapper())
     );
-    SchemaRegistryBasedAvroBytesDecoder decoder;
-    decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper
-        .readerFor(AvroBytesDecoder.class)
-        .readValue(json);
+
+    // When
+    SchemaRegistryBasedAvroBytesDecoder decoder = 
mapper.readerFor(AvroBytesDecoder.class).readValue(json);
 
     // Then
     Assert.assertNotEquals(decoder.hashCode(), 0);
@@ -78,15 +79,15 @@ public class SchemaRegistryBasedAvroBytesDecoderTest
   @Test
   public void testUrl() throws Exception
   {
+    // Given
     String json = "{\"url\":\"http://localhost\",\"type\": 
\"schema_registry\"}";
     ObjectMapper mapper = new DefaultObjectMapper();
     mapper.setInjectableValues(
         new InjectableValues.Std().addValue(ObjectMapper.class, new 
DefaultObjectMapper())
     );
-    SchemaRegistryBasedAvroBytesDecoder decoder;
-    decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper
-        .readerFor(AvroBytesDecoder.class)
-        .readValue(json);
+
+    // When
+    SchemaRegistryBasedAvroBytesDecoder decoder = 
mapper.readerFor(AvroBytesDecoder.class).readValue(json);
 
     // Then
     Assert.assertNotEquals(decoder.hashCode(), 0);
@@ -95,15 +96,15 @@ public class SchemaRegistryBasedAvroBytesDecoderTest
   @Test
   public void testConfig() throws Exception
   {
+    // Given
     String json = "{\"url\":\"http://localhost\",\"type\": 
\"schema_registry\", \"config\":{}}";
     ObjectMapper mapper = new DefaultObjectMapper();
     mapper.setInjectableValues(
         new InjectableValues.Std().addValue(ObjectMapper.class, new 
DefaultObjectMapper())
     );
-    SchemaRegistryBasedAvroBytesDecoder decoder;
-    decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper
-        .readerFor(AvroBytesDecoder.class)
-        .readValue(json);
+
+    // When
+    SchemaRegistryBasedAvroBytesDecoder decoder = 
mapper.readerFor(AvroBytesDecoder.class).readValue(json);
 
     // Then
     Assert.assertNotEquals(decoder.hashCode(), 0);
@@ -120,21 +121,33 @@ public class SchemaRegistryBasedAvroBytesDecoderTest
     byte[] bytes = getAvroDatum(schema, someAvroDatum);
     ByteBuffer bb = ByteBuffer.allocate(bytes.length + 5).put((byte) 
0).putInt(1234).put(bytes);
     bb.rewind();
+
     // When
-    new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb);
+    GenericRecord parse = new 
SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb);
+
+    // Then
+    Assert.assertEquals(schema, parse.getSchema());
   }
 
-  @Test(expected = ParseException.class)
+  @Test
   public void testParseCorruptedNotEnoughBytesToEvenGetSchemaInfo()
   {
     // Given
     ByteBuffer bb = ByteBuffer.allocate(2).put((byte) 0).put(1, (byte) 1);
     bb.rewind();
-    // When
-    new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb);
+
+    // When / Then
+    final ParseException e = Assert.assertThrows(
+        ParseException.class,
+        () -> new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb)
+    );
+    MatcherAssert.assertThat(
+        e.getMessage(),
+        CoreMatchers.containsString("Failed to decode avro message, not enough 
bytes to decode (2)")
+    );
   }
 
-  @Test(expected = ParseException.class)
+  @Test
   public void testParseCorruptedPartial() throws Exception
   {
     // Given
@@ -145,19 +158,30 @@ public class SchemaRegistryBasedAvroBytesDecoderTest
     byte[] bytes = getAvroDatum(schema, someAvroDatum);
     ByteBuffer bb = ByteBuffer.allocate(4 + 5).put((byte) 
0).putInt(1234).put(bytes, 5, 4);
     bb.rewind();
-    // When
-    new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb);
+
+    // When / Then
+    final ParseException e = Assert.assertThrows(
+        ParseException.class,
+        () -> new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb)
+    );
+    MatcherAssert.assertThat(e.getCause(), 
CoreMatchers.instanceOf(IOException.class));
+    MatcherAssert.assertThat(e.getMessage(), 
CoreMatchers.containsString("Failed to decode Avro message for schema 
id[1234]"));
   }
 
-  @Test(expected = ParseException.class)
+  @Test
   public void testParseWrongSchemaType() throws Exception
   {
     // Given
     
Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234))).thenReturn(Mockito.mock(ParsedSchema.class));
     ByteBuffer bb = ByteBuffer.allocate(5).put((byte) 0).putInt(1234);
     bb.rewind();
-    // When
-    new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb);
+
+    // When / Then
+    final ParseException e = Assert.assertThrows(
+        ParseException.class,
+        () -> new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb)
+    );
+    MatcherAssert.assertThat(e.getMessage(), CoreMatchers.containsString("No 
Avro schema id[1234] in registry"));
   }
 
   @Test
@@ -167,7 +191,8 @@ public class SchemaRegistryBasedAvroBytesDecoderTest
     
Mockito.when(registry.getSchemaById(ArgumentMatchers.anyInt())).thenThrow(new 
IOException("no pasaran"));
     ByteBuffer bb = ByteBuffer.allocate(5).put((byte) 0).putInt(1234);
     bb.rewind();
-    // When
+
+    // When / Then
     final ParseException e = Assert.assertThrows(
         ParseException.class,
         () -> new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb)
@@ -187,17 +212,20 @@ public class SchemaRegistryBasedAvroBytesDecoderTest
   @Test
   public void testParseHeader() throws JsonProcessingException
   {
+    // Given
     String json = 
"{\"url\":\"http://localhost\",\"type\":\"schema_registry\",\"config\":{},\"headers\":{\"druid.dynamic.config.provider\":{\"type\":\"mapString\";,
 \"config\":{\"registry.header.prop.2\":\"value.2\", 
\"registry.header.prop.3\":\"value.3\"}},\"registry.header.prop.1\":\"value.1\",\"registry.header.prop.2\":\"value.4\"}}";
     ObjectMapper mapper = new DefaultObjectMapper();
     mapper.setInjectableValues(
         new InjectableValues.Std().addValue(ObjectMapper.class, new 
DefaultObjectMapper())
     );
-    SchemaRegistryBasedAvroBytesDecoder decoder;
-    decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper
-        .readerFor(AvroBytesDecoder.class)
-        .readValue(json);
+    SchemaRegistryBasedAvroBytesDecoder decoder = 
mapper.readerFor(AvroBytesDecoder.class).readValue(json);
 
-    Map<String, String> header = 
DynamicConfigProviderUtils.extraConfigAndSetStringMap(decoder.getHeaders(), 
SchemaRegistryBasedAvroBytesDecoder.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, new 
DefaultObjectMapper());
+    // When
+    Map<String, String> header = 
DynamicConfigProviderUtils.extraConfigAndSetStringMap(
+        decoder.getHeaders(),
+        SchemaRegistryBasedAvroBytesDecoder.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY,
+        new DefaultObjectMapper()
+    );
 
     // Then
     Assert.assertEquals(3, header.size());
@@ -209,17 +237,20 @@ public class SchemaRegistryBasedAvroBytesDecoderTest
   @Test
   public void testParseConfig() throws JsonProcessingException
   {
+    // Given
     String json = 
"{\"url\":\"http://localhost\",\"type\":\"schema_registry\",\"config\":{\"druid.dynamic.config.provider\":{\"type\":\"mapString\";,
 \"config\":{\"registry.config.prop.2\":\"value.2\", 
\"registry.config.prop.3\":\"value.3\"}},\"registry.config.prop.1\":\"value.1\",\"registry.config.prop.2\":\"value.4\"},\"headers\":{}}";
     ObjectMapper mapper = new DefaultObjectMapper();
     mapper.setInjectableValues(
         new InjectableValues.Std().addValue(ObjectMapper.class, new 
DefaultObjectMapper())
     );
-    SchemaRegistryBasedAvroBytesDecoder decoder;
-    decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper
-        .readerFor(AvroBytesDecoder.class)
-        .readValue(json);
+    SchemaRegistryBasedAvroBytesDecoder decoder = 
mapper.readerFor(AvroBytesDecoder.class).readValue(json);
 
-    Map<String, ?> config = 
DynamicConfigProviderUtils.extraConfigAndSetStringMap(decoder.getConfig(), 
SchemaRegistryBasedAvroBytesDecoder.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, new 
DefaultObjectMapper());
+    // When
+    Map<String, ?> config = 
DynamicConfigProviderUtils.extraConfigAndSetStringMap(
+        decoder.getConfig(),
+        SchemaRegistryBasedAvroBytesDecoder.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY,
+        new DefaultObjectMapper()
+    );
 
     // Then
     Assert.assertEquals(3, config.size());
@@ -227,4 +258,59 @@ public class SchemaRegistryBasedAvroBytesDecoderTest
     Assert.assertEquals("value.2", config.get("registry.config.prop.2"));
     Assert.assertEquals("value.3", config.get("registry.config.prop.3"));
   }
+
+  @Test
+  public void testParseWhenUnauthenticatedException() throws IOException, 
RestClientException
+  {
+    // Given
+    Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234)))
+           .thenThrow(new RestClientException("unauthenticated", 401, 401));
+    GenericRecord someAvroDatum = 
AvroStreamInputRowParserTest.buildSomeAvroDatum();
+    Schema schema = SomeAvroDatum.getClassSchema();
+    byte[] bytes = getAvroDatum(schema, someAvroDatum);
+    ByteBuffer bb = ByteBuffer.allocate(4 + 5).put((byte) 
0).putInt(1234).put(bytes, 5, 4);
+    bb.rewind();
+
+    // When / Then
+    final ParseException e = Assert.assertThrows(
+        ParseException.class,
+        () -> new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb)
+    );
+    MatcherAssert.assertThat(e.getCause(), 
CoreMatchers.instanceOf(RestClientException.class));
+    MatcherAssert.assertThat(e.getCause().getMessage(), 
CoreMatchers.containsString("unauthenticated"));
+    MatcherAssert.assertThat(
+        e.getMessage(),
+        CoreMatchers.containsString(
+            "Failed to authenticate to schema registry for Avro schema 
id[1234]. Please check your credentials"
+        )
+    );
+  }
+
+  @Test
+  public void testParseWhenResourceNotFoundException() throws IOException, 
RestClientException
+  {
+    // Given
+    Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234)))
+           .thenThrow(new RestClientException("resource doesn't exist", 404, 
404));
+    GenericRecord someAvroDatum = 
AvroStreamInputRowParserTest.buildSomeAvroDatum();
+    Schema schema = SomeAvroDatum.getClassSchema();
+    byte[] bytes = getAvroDatum(schema, someAvroDatum);
+    ByteBuffer bb = ByteBuffer.allocate(4 + 5).put((byte) 
0).putInt(1234).put(bytes, 5, 4);
+    bb.rewind();
+
+    // When / Then
+    final ParseException e = Assert.assertThrows(
+        ParseException.class,
+        () -> new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb)
+    );
+    MatcherAssert.assertThat(e.getCause(), 
CoreMatchers.instanceOf(RestClientException.class));
+    MatcherAssert.assertThat(e.getCause().getMessage(), 
CoreMatchers.containsString("resource doesn't exist"));
+    MatcherAssert.assertThat(
+        e.getMessage(),
+        CoreMatchers.containsString(
+            "Failed to fetch Avro schema id[1234] from registry."
+            + " Error code[404] and message[resource doesn't exist; error 
code: 404]."
+        )
+    );
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to