This is an automated email from the ASF dual-hosted git repository.
abhishekrb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 35b970935f9 Better error handling when retrieving Avro schemas from
registry (#16684)
35b970935f9 is described below
commit 35b970935f92c16f726df99806043b298260d112
Author: Abhishek Radhakrishnan <[email protected]>
AuthorDate: Tue Jul 2 16:48:34 2024 -0700
Better error handling when retrieving Avro schemas from registry (#16684)
* Handle RestClientException separately, instead of returning a generic
error.
- Add tests
- Clean up the tests; remove the legacy expected exception pattern
- Better test assertions
* Rename tests
* checkstyle fixes
---
.../avro/SchemaRegistryBasedAvroBytesDecoder.java | 33 ++++-
.../SchemaRegistryBasedAvroBytesDecoderTest.java | 152 ++++++++++++++++-----
2 files changed, 147 insertions(+), 38 deletions(-)
diff --git
a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java
b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java
index 167b036e7d1..c7c7438b1cd 100644
---
a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java
+++
b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java
@@ -139,11 +139,34 @@ public class SchemaRegistryBasedAvroBytesDecoder
implements AvroBytesDecoder
ParsedSchema parsedSchema = registry.getSchemaById(id);
schema = parsedSchema instanceof AvroSchema ? ((AvroSchema)
parsedSchema).rawSchema() : null;
}
- catch (IOException | RestClientException ex) {
- throw new ParseException(null, ex, "Failed to fetch Avro schema id[%s]
from registry. Check if the schema "
- + "exists in the registry. Otherwise
it could mean that there is "
- + "malformed data in the stream or
data that doesn't conform to the schema "
- + "specified.", id);
+ catch (IOException ex1) {
+ throw new ParseException(
+ null,
+ ex1,
+ "Failed to fetch Avro schema id[%s] from registry. Check if the
schema exists in the registry. Otherwise it"
+ + " could mean that there is malformed data in the stream or data
that doesn't conform to the schema"
+ + " specified.",
+ id
+ );
+ }
+ catch (RestClientException ex2) {
+ if (ex2.getErrorCode() == 401) {
+ throw new ParseException(
+ null,
+ ex2,
+ "Failed to authenticate to schema registry for Avro schema id[%s].
Please check your credentials.",
+ id
+ );
+ }
+ // For all other errors, just include the code and message received from
the library.
+ throw new ParseException(
+ null,
+ ex2,
+ "Failed to fetch Avro schema id[%s] from registry. Error code[%s]
and message[%s].",
+ id,
+ ex2.getErrorCode(),
+ ex2.getMessage()
+ );
}
if (schema == null) {
throw new ParseException(null, "No Avro schema id[%s] in registry", id);
diff --git
a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java
b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java
index 7644b61bb10..2aad88e8ff5 100644
---
a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java
+++
b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
@@ -61,15 +62,15 @@ public class SchemaRegistryBasedAvroBytesDecoderTest
@Test
public void testMultipleUrls() throws Exception
{
+ // Given
String json = "{\"urls\":[\"http://localhost\"],\"type\":
\"schema_registry\"}";
ObjectMapper mapper = new DefaultObjectMapper();
mapper.setInjectableValues(
new InjectableValues.Std().addValue(ObjectMapper.class, new
DefaultObjectMapper())
);
- SchemaRegistryBasedAvroBytesDecoder decoder;
- decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper
- .readerFor(AvroBytesDecoder.class)
- .readValue(json);
+
+ // When
+ SchemaRegistryBasedAvroBytesDecoder decoder =
mapper.readerFor(AvroBytesDecoder.class).readValue(json);
// Then
Assert.assertNotEquals(decoder.hashCode(), 0);
@@ -78,15 +79,15 @@ public class SchemaRegistryBasedAvroBytesDecoderTest
@Test
public void testUrl() throws Exception
{
+ // Given
String json = "{\"url\":\"http://localhost\",\"type\":
\"schema_registry\"}";
ObjectMapper mapper = new DefaultObjectMapper();
mapper.setInjectableValues(
new InjectableValues.Std().addValue(ObjectMapper.class, new
DefaultObjectMapper())
);
- SchemaRegistryBasedAvroBytesDecoder decoder;
- decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper
- .readerFor(AvroBytesDecoder.class)
- .readValue(json);
+
+ // When
+ SchemaRegistryBasedAvroBytesDecoder decoder =
mapper.readerFor(AvroBytesDecoder.class).readValue(json);
// Then
Assert.assertNotEquals(decoder.hashCode(), 0);
@@ -95,15 +96,15 @@ public class SchemaRegistryBasedAvroBytesDecoderTest
@Test
public void testConfig() throws Exception
{
+ // Given
String json = "{\"url\":\"http://localhost\",\"type\":
\"schema_registry\", \"config\":{}}";
ObjectMapper mapper = new DefaultObjectMapper();
mapper.setInjectableValues(
new InjectableValues.Std().addValue(ObjectMapper.class, new
DefaultObjectMapper())
);
- SchemaRegistryBasedAvroBytesDecoder decoder;
- decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper
- .readerFor(AvroBytesDecoder.class)
- .readValue(json);
+
+ // When
+ SchemaRegistryBasedAvroBytesDecoder decoder =
mapper.readerFor(AvroBytesDecoder.class).readValue(json);
// Then
Assert.assertNotEquals(decoder.hashCode(), 0);
@@ -120,21 +121,33 @@ public class SchemaRegistryBasedAvroBytesDecoderTest
byte[] bytes = getAvroDatum(schema, someAvroDatum);
ByteBuffer bb = ByteBuffer.allocate(bytes.length + 5).put((byte)
0).putInt(1234).put(bytes);
bb.rewind();
+
// When
- new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb);
+ GenericRecord parse = new
SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb);
+
+ // Then
+ Assert.assertEquals(schema, parse.getSchema());
}
- @Test(expected = ParseException.class)
+ @Test
public void testParseCorruptedNotEnoughBytesToEvenGetSchemaInfo()
{
// Given
ByteBuffer bb = ByteBuffer.allocate(2).put((byte) 0).put(1, (byte) 1);
bb.rewind();
- // When
- new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb);
+
+ // When / Then
+ final ParseException e = Assert.assertThrows(
+ ParseException.class,
+ () -> new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb)
+ );
+ MatcherAssert.assertThat(
+ e.getMessage(),
+ CoreMatchers.containsString("Failed to decode avro message, not enough
bytes to decode (2)")
+ );
}
- @Test(expected = ParseException.class)
+ @Test
public void testParseCorruptedPartial() throws Exception
{
// Given
@@ -145,19 +158,30 @@ public class SchemaRegistryBasedAvroBytesDecoderTest
byte[] bytes = getAvroDatum(schema, someAvroDatum);
ByteBuffer bb = ByteBuffer.allocate(4 + 5).put((byte)
0).putInt(1234).put(bytes, 5, 4);
bb.rewind();
- // When
- new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb);
+
+ // When / Then
+ final ParseException e = Assert.assertThrows(
+ ParseException.class,
+ () -> new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb)
+ );
+ MatcherAssert.assertThat(e.getCause(),
CoreMatchers.instanceOf(IOException.class));
+ MatcherAssert.assertThat(e.getMessage(),
CoreMatchers.containsString("Failed to decode Avro message for schema
id[1234]"));
}
- @Test(expected = ParseException.class)
+ @Test
public void testParseWrongSchemaType() throws Exception
{
// Given
Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234))).thenReturn(Mockito.mock(ParsedSchema.class));
ByteBuffer bb = ByteBuffer.allocate(5).put((byte) 0).putInt(1234);
bb.rewind();
- // When
- new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb);
+
+ // When / Then
+ final ParseException e = Assert.assertThrows(
+ ParseException.class,
+ () -> new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb)
+ );
+ MatcherAssert.assertThat(e.getMessage(), CoreMatchers.containsString("No
Avro schema id[1234] in registry"));
}
@Test
@@ -167,7 +191,8 @@ public class SchemaRegistryBasedAvroBytesDecoderTest
Mockito.when(registry.getSchemaById(ArgumentMatchers.anyInt())).thenThrow(new
IOException("no pasaran"));
ByteBuffer bb = ByteBuffer.allocate(5).put((byte) 0).putInt(1234);
bb.rewind();
- // When
+
+ // When / Then
final ParseException e = Assert.assertThrows(
ParseException.class,
() -> new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb)
@@ -187,17 +212,20 @@ public class SchemaRegistryBasedAvroBytesDecoderTest
@Test
public void testParseHeader() throws JsonProcessingException
{
+ // Given
String json =
"{\"url\":\"http://localhost\",\"type\":\"schema_registry\",\"config\":{},\"headers\":{\"druid.dynamic.config.provider\":{\"type\":\"mapString\",
\"config\":{\"registry.header.prop.2\":\"value.2\",
\"registry.header.prop.3\":\"value.3\"}},\"registry.header.prop.1\":\"value.1\",\"registry.header.prop.2\":\"value.4\"}}";
ObjectMapper mapper = new DefaultObjectMapper();
mapper.setInjectableValues(
new InjectableValues.Std().addValue(ObjectMapper.class, new
DefaultObjectMapper())
);
- SchemaRegistryBasedAvroBytesDecoder decoder;
- decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper
- .readerFor(AvroBytesDecoder.class)
- .readValue(json);
+ SchemaRegistryBasedAvroBytesDecoder decoder =
mapper.readerFor(AvroBytesDecoder.class).readValue(json);
- Map<String, String> header =
DynamicConfigProviderUtils.extraConfigAndSetStringMap(decoder.getHeaders(),
SchemaRegistryBasedAvroBytesDecoder.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, new
DefaultObjectMapper());
+ // When
+ Map<String, String> header =
DynamicConfigProviderUtils.extraConfigAndSetStringMap(
+ decoder.getHeaders(),
+ SchemaRegistryBasedAvroBytesDecoder.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY,
+ new DefaultObjectMapper()
+ );
// Then
Assert.assertEquals(3, header.size());
@@ -209,17 +237,20 @@ public class SchemaRegistryBasedAvroBytesDecoderTest
@Test
public void testParseConfig() throws JsonProcessingException
{
+ // Given
String json =
"{\"url\":\"http://localhost\",\"type\":\"schema_registry\",\"config\":{\"druid.dynamic.config.provider\":{\"type\":\"mapString\",
\"config\":{\"registry.config.prop.2\":\"value.2\",
\"registry.config.prop.3\":\"value.3\"}},\"registry.config.prop.1\":\"value.1\",\"registry.config.prop.2\":\"value.4\"},\"headers\":{}}";
ObjectMapper mapper = new DefaultObjectMapper();
mapper.setInjectableValues(
new InjectableValues.Std().addValue(ObjectMapper.class, new
DefaultObjectMapper())
);
- SchemaRegistryBasedAvroBytesDecoder decoder;
- decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper
- .readerFor(AvroBytesDecoder.class)
- .readValue(json);
+ SchemaRegistryBasedAvroBytesDecoder decoder =
mapper.readerFor(AvroBytesDecoder.class).readValue(json);
- Map<String, ?> config =
DynamicConfigProviderUtils.extraConfigAndSetStringMap(decoder.getConfig(),
SchemaRegistryBasedAvroBytesDecoder.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, new
DefaultObjectMapper());
+ // When
+ Map<String, ?> config =
DynamicConfigProviderUtils.extraConfigAndSetStringMap(
+ decoder.getConfig(),
+ SchemaRegistryBasedAvroBytesDecoder.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY,
+ new DefaultObjectMapper()
+ );
// Then
Assert.assertEquals(3, config.size());
@@ -227,4 +258,59 @@ public class SchemaRegistryBasedAvroBytesDecoderTest
Assert.assertEquals("value.2", config.get("registry.config.prop.2"));
Assert.assertEquals("value.3", config.get("registry.config.prop.3"));
}
+
+ @Test
+ public void testParseWhenUnauthenticatedException() throws IOException,
RestClientException
+ {
+ // Given
+ Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234)))
+ .thenThrow(new RestClientException("unauthenticated", 401, 401));
+ GenericRecord someAvroDatum =
AvroStreamInputRowParserTest.buildSomeAvroDatum();
+ Schema schema = SomeAvroDatum.getClassSchema();
+ byte[] bytes = getAvroDatum(schema, someAvroDatum);
+ ByteBuffer bb = ByteBuffer.allocate(4 + 5).put((byte)
0).putInt(1234).put(bytes, 5, 4);
+ bb.rewind();
+
+ // When / Then
+ final ParseException e = Assert.assertThrows(
+ ParseException.class,
+ () -> new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb)
+ );
+ MatcherAssert.assertThat(e.getCause(),
CoreMatchers.instanceOf(RestClientException.class));
+ MatcherAssert.assertThat(e.getCause().getMessage(),
CoreMatchers.containsString("unauthenticated"));
+ MatcherAssert.assertThat(
+ e.getMessage(),
+ CoreMatchers.containsString(
+ "Failed to authenticate to schema registry for Avro schema
id[1234]. Please check your credentials"
+ )
+ );
+ }
+
+ @Test
+ public void testParseWhenResourceNotFoundException() throws IOException,
RestClientException
+ {
+ // Given
+ Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234)))
+ .thenThrow(new RestClientException("resource doesn't exist", 404,
404));
+ GenericRecord someAvroDatum =
AvroStreamInputRowParserTest.buildSomeAvroDatum();
+ Schema schema = SomeAvroDatum.getClassSchema();
+ byte[] bytes = getAvroDatum(schema, someAvroDatum);
+ ByteBuffer bb = ByteBuffer.allocate(4 + 5).put((byte)
0).putInt(1234).put(bytes, 5, 4);
+ bb.rewind();
+
+ // When / Then
+ final ParseException e = Assert.assertThrows(
+ ParseException.class,
+ () -> new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb)
+ );
+ MatcherAssert.assertThat(e.getCause(),
CoreMatchers.instanceOf(RestClientException.class));
+ MatcherAssert.assertThat(e.getCause().getMessage(),
CoreMatchers.containsString("resource doesn't exist"));
+ MatcherAssert.assertThat(
+ e.getMessage(),
+ CoreMatchers.containsString(
+ "Failed to fetch Avro schema id[1234] from registry."
+ + " Error code[404] and message[resource doesn't exist; error
code: 404]."
+ )
+ );
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]