This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new aa7cb50 Add DynamicConfigProvider for Schema Registry (#11362)
aa7cb50 is described below
commit aa7cb50f24a7e1a2c2506310fe3468c60f16766c
Author: Yi Yuan <[email protected]>
AuthorDate: Wed Aug 4 04:24:52 2021 +0800
Add DynamicConfigProvider for Schema Registry (#11362)
* add_DynamicConfigProvider_for_schema_registry
* bug fixed
* add document
* fix document
* fix spot bug
* fix document
* inject ObjectMapper
* add DynamicConfigProviderUtils
* add UT
* bug fixed
Co-authored-by: yuanyi <[email protected]>
---
.../druid/utils/DynamicConfigProviderUtils.java | 73 +++++++++++++++++++
.../utils/DynamicConfigProviderUtilsTest.java | 84 ++++++++++++++++++++++
docs/ingestion/data-formats.md | 25 +++++--
.../avro/SchemaRegistryBasedAvroBytesDecoder.java | 25 ++++---
.../data/input/AvroStreamInputFormatTest.java | 6 +-
.../SchemaRegistryBasedAvroBytesDecoderTest.java | 64 ++++++++++++++++-
.../SchemaRegistryBasedProtobufBytesDecoder.java | 25 ++++---
.../input/protobuf/ProtobufInputFormatTest.java | 6 +-
...chemaRegistryBasedProtobufBytesDecoderTest.java | 68 ++++++++++++++++--
9 files changed, 345 insertions(+), 31 deletions(-)
diff --git
a/core/src/main/java/org/apache/druid/utils/DynamicConfigProviderUtils.java
b/core/src/main/java/org/apache/druid/utils/DynamicConfigProviderUtils.java
new file mode 100644
index 0000000..4c45262
--- /dev/null
+++ b/core/src/main/java/org/apache/druid/utils/DynamicConfigProviderUtils.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.utils;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.metadata.DynamicConfigProvider;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class DynamicConfigProviderUtils
+{
+ public static Map<String, String> extraConfigAndSetStringMap(Map<String,
Object> config, String dynamicConfigProviderKey, ObjectMapper mapper)
+ {
+ HashMap<String, String> newConfig = new HashMap<>();
+ if (config != null) {
+ for (Map.Entry<String, Object> entry : config.entrySet()) {
+ if (!dynamicConfigProviderKey.equals(entry.getKey())) {
+ newConfig.put(entry.getKey(), entry.getValue().toString());
+ }
+ }
+ Map<String, String> dynamicConfig =
extraConfigFromProvider(config.get(dynamicConfigProviderKey), mapper);
+ for (Map.Entry<String, String> entry : dynamicConfig.entrySet()) {
+ newConfig.put(entry.getKey(), entry.getValue());
+ }
+ }
+ return newConfig;
+ }
+
+ public static Map<String, Object> extraConfigAndSetObjectMap(Map<String,
Object> config, String dynamicConfigProviderKey, ObjectMapper mapper)
+ {
+ HashMap<String, Object> newConfig = new HashMap<>();
+ if (config != null) {
+ for (Map.Entry<String, Object> entry : config.entrySet()) {
+ if (!dynamicConfigProviderKey.equals(entry.getKey())) {
+ newConfig.put(entry.getKey(), entry.getValue());
+ }
+ }
+ Map<String, String> dynamicConfig =
extraConfigFromProvider(config.get(dynamicConfigProviderKey), mapper);
+ for (Map.Entry<String, String> entry : dynamicConfig.entrySet()) {
+ newConfig.put(entry.getKey(), entry.getValue());
+ }
+ }
+ return newConfig;
+ }
+
+ private static Map<String, String> extraConfigFromProvider(Object
dynamicConfigProviderJson, ObjectMapper mapper)
+ {
+ if (dynamicConfigProviderJson != null) {
+ DynamicConfigProvider dynamicConfigProvider =
mapper.convertValue(dynamicConfigProviderJson, DynamicConfigProvider.class);
+ return dynamicConfigProvider.getConfig();
+ }
+ return Collections.emptyMap();
+ }
+}
diff --git
a/core/src/test/java/org/apache/druid/utils/DynamicConfigProviderUtilsTest.java
b/core/src/test/java/org/apache/druid/utils/DynamicConfigProviderUtilsTest.java
new file mode 100644
index 0000000..496acfa
--- /dev/null
+++
b/core/src/test/java/org/apache/druid/utils/DynamicConfigProviderUtilsTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.utils;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.metadata.DynamicConfigProvider;
+import org.apache.druid.metadata.MapStringDynamicConfigProvider;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+
+import java.util.Map;
+
+@RunWith(Enclosed.class)
+public class DynamicConfigProviderUtilsTest
+{
+ public static class ThrowIfURLHasNotAllowedPropertiesTest
+ {
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private final String DYNAMIC_CONFIG_PROVIDER =
"druid.dynamic.config.provider";
+
+ @Test
+ public void testExtraConfigAndSetStringMap()
+ {
+ DynamicConfigProvider dynamicConfigProvider = new
MapStringDynamicConfigProvider(
+ ImmutableMap.of("prop2", "value2")
+ );
+
+ Map<String, Object> properties = ImmutableMap.of(
+ "prop1", "value1",
+ "prop2", "value3",
+ DYNAMIC_CONFIG_PROVIDER,
OBJECT_MAPPER.convertValue(dynamicConfigProvider, Map.class)
+ );
+ Map<String, String> res =
DynamicConfigProviderUtils.extraConfigAndSetStringMap(properties,
DYNAMIC_CONFIG_PROVIDER, OBJECT_MAPPER);
+
+ Assert.assertEquals(2, res.size());
+ Assert.assertEquals("value1", res.get("prop1"));
+ Assert.assertEquals("value2", res.get("prop2"));
+ }
+
+ @Test
+ public void testExtraConfigAndSetObjectMap()
+ {
+ DynamicConfigProvider dynamicConfigProvider = new
MapStringDynamicConfigProvider(
+ ImmutableMap.of("prop2", "value2")
+ );
+
+ Map<String, Object> properties = ImmutableMap.of(
+ "prop1", "value1",
+ "prop2", "value3",
+ DYNAMIC_CONFIG_PROVIDER,
OBJECT_MAPPER.convertValue(dynamicConfigProvider, Map.class)
+ );
+ Map<String, Object> res =
DynamicConfigProviderUtils.extraConfigAndSetObjectMap(properties,
DYNAMIC_CONFIG_PROVIDER, OBJECT_MAPPER);
+
+ Assert.assertEquals(2, res.size());
+ Assert.assertEquals("value1", res.get("prop1").toString());
+ Assert.assertEquals("value2", res.get("prop2").toString());
+ }
+ }
+}
diff --git a/docs/ingestion/data-formats.md b/docs/ingestion/data-formats.md
index 002d6d2..cb1b3d2 100644
--- a/docs/ingestion/data-formats.md
+++ b/docs/ingestion/data-formats.md
@@ -380,8 +380,8 @@ For details, see the Schema Registry
[documentation](http://docs.confluent.io/cu
| url | String | Specifies the url endpoint of the Schema Registry. | yes |
| capacity | Integer | Specifies the max size of the cache (default =
Integer.MAX_VALUE). | no |
| urls | Array<String> | Specifies the url endpoints of the multiple Schema
Registry instances. | yes(if `url` is not provided) |
-| config | Json | To send additional configurations, configured for Schema
Registry | no |
-| headers | Json | To send headers to the Schema Registry | no |
+| config | Json | To send additional configurations, configured for Schema
Registry. This can be supplied via a
[DynamicConfigProvider](../operations/dynamic-config-provider.md) | no |
+| headers | Json | To send headers to the Schema Registry. This can be
supplied via a
[DynamicConfigProvider](../operations/dynamic-config-provider.md) | no |
For a single schema registry instance, use Field `url` or `urls` for multi
instances.
@@ -408,12 +408,20 @@ Multiple Instances:
"schema.registry.ssl.truststore.password": "<password>",
"schema.registry.ssl.keystore.location":
"/some/secrets/kafka.client.keystore.jks",
"schema.registry.ssl.keystore.password": "<password>",
- "schema.registry.ssl.key.password": "<password>"
+ "schema.registry.ssl.key.password": "<password>",
+ "schema.registry.ssl.key.password",
...
},
"headers": {
"traceID" : "b29c5de2-0db4-490b-b421",
"timeStamp" : "1577191871865",
+ "druid.dynamic.config.provider":{
+ "type":"mapString",
+ "config":{
+ "registry.header.prop.1":"value.1",
+ "registry.header.prop.2":"value.2"
+ }
+ }
...
}
}
@@ -1223,8 +1231,8 @@ For details, see the Schema Registry
[documentation](http://docs.confluent.io/cu
| url | String | Specifies the url endpoint of the Schema Registry. | yes |
| capacity | Integer | Specifies the max size of the cache (default =
Integer.MAX_VALUE). | no |
| urls | Array<String> | Specifies the url endpoints of the multiple Schema
Registry instances. | yes(if `url` is not provided) |
-| config | Json | To send additional configurations, configured for Schema
Registry | no |
-| headers | Json | To send headers to the Schema Registry | no |
+| config | Json | To send additional configurations, configured for Schema
Registry. This can be supplied via a
[DynamicConfigProvider](../operations/dynamic-config-provider.md). | no |
+| headers | Json | To send headers to the Schema Registry. This can be
supplied via a
[DynamicConfigProvider](../operations/dynamic-config-provider.md) | no |
For a single schema registry instance, use Field `url` or `urls` for multi
instances.
@@ -1259,6 +1267,13 @@ Multiple Instances:
"headers": {
"traceID" : "b29c5de2-0db4-490b-b421",
"timeStamp" : "1577191871865",
+ "druid.dynamic.config.provider":{
+ "type":"mapString",
+ "config":{
+ "registry.header.prop.1":"value.1",
+ "registry.header.prop.2":"value.2"
+ }
+ }
...
}
}
diff --git
a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java
b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java
index 4b3da38..59cb33e 100644
---
a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java
+++
b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java
@@ -19,8 +19,10 @@
package org.apache.druid.data.input.avro;
+import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
@@ -32,8 +34,10 @@ import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
+import org.apache.druid.guice.annotations.Json;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.parsers.ParseException;
+import org.apache.druid.utils.DynamicConfigProviderUtils;
import javax.annotation.Nullable;
import java.io.IOException;
@@ -48,16 +52,19 @@ public class SchemaRegistryBasedAvroBytesDecoder implements
AvroBytesDecoder
private final String url;
private final int capacity;
private final List<String> urls;
- private final Map<String, ?> config;
- private final Map<String, String> headers;
+ private final Map<String, Object> config;
+ private final Map<String, Object> headers;
+ private final ObjectMapper jsonMapper;
+ public static final String DRUID_DYNAMIC_CONFIG_PROVIDER_KEY =
"druid.dynamic.config.provider";
@JsonCreator
public SchemaRegistryBasedAvroBytesDecoder(
@JsonProperty("url") @Deprecated String url,
@JsonProperty("capacity") Integer capacity,
@JsonProperty("urls") @Nullable List<String> urls,
- @JsonProperty("config") @Nullable Map<String, ?> config,
- @JsonProperty("headers") @Nullable Map<String, String> headers
+ @JsonProperty("config") @Nullable Map<String, Object> config,
+ @JsonProperty("headers") @Nullable Map<String, Object> headers,
+ @JacksonInject @Json ObjectMapper jsonMapper
)
{
this.url = url;
@@ -65,10 +72,11 @@ public class SchemaRegistryBasedAvroBytesDecoder implements
AvroBytesDecoder
this.urls = urls;
this.config = config;
this.headers = headers;
+ this.jsonMapper = jsonMapper;
if (url != null && !url.isEmpty()) {
- this.registry = new CachedSchemaRegistryClient(this.url, this.capacity,
this.config, this.headers);
+ this.registry = new CachedSchemaRegistryClient(this.url, this.capacity,
DynamicConfigProviderUtils.extraConfigAndSetObjectMap(config,
DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, this.jsonMapper),
DynamicConfigProviderUtils.extraConfigAndSetStringMap(headers,
DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, this.jsonMapper));
} else {
- this.registry = new CachedSchemaRegistryClient(this.urls, this.capacity,
this.config, this.headers);
+ this.registry = new CachedSchemaRegistryClient(this.urls, this.capacity,
DynamicConfigProviderUtils.extraConfigAndSetObjectMap(config,
DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, this.jsonMapper),
DynamicConfigProviderUtils.extraConfigAndSetStringMap(headers,
DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, this.jsonMapper));
}
}
@@ -91,13 +99,13 @@ public class SchemaRegistryBasedAvroBytesDecoder implements
AvroBytesDecoder
}
@JsonProperty
- public Map<String, ?> getConfig()
+ public Map<String, Object> getConfig()
{
return config;
}
@JsonProperty
- public Map<String, String> getHeaders()
+ public Map<String, Object> getHeaders()
{
return headers;
}
@@ -112,6 +120,7 @@ public class SchemaRegistryBasedAvroBytesDecoder implements
AvroBytesDecoder
this.config = null;
this.headers = null;
this.registry = registry;
+ this.jsonMapper = new ObjectMapper();
}
@Override
diff --git
a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java
b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java
index 4213008..4a0c2df 100644
---
a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java
+++
b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java
@@ -19,6 +19,7 @@
package org.apache.druid.data.input;
+import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
@@ -108,6 +109,9 @@ public class AvroStreamInputFormatTest
for (Module jacksonModule : new
AvroExtensionsModule().getJacksonModules()) {
jsonMapper.registerModule(jacksonModule);
}
+ jsonMapper.setInjectableValues(
+ new InjectableValues.Std().addValue(ObjectMapper.class, new
DefaultObjectMapper())
+ );
}
@Test
@@ -151,7 +155,7 @@ public class AvroStreamInputFormatTest
{
AvroStreamInputFormat inputFormat = new AvroStreamInputFormat(
flattenSpec,
- new SchemaRegistryBasedAvroBytesDecoder("http://test:8081", 100, null,
null, null),
+ new SchemaRegistryBasedAvroBytesDecoder("http://test:8081", 100, null,
null, null, null),
false,
false
);
diff --git
a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java
b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java
index 3eb6439..ec073c9 100644
---
a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java
+++
b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java
@@ -19,6 +19,8 @@
package org.apache.druid.data.input.avro;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
@@ -30,8 +32,10 @@ import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.druid.data.input.AvroStreamInputRowParserTest;
import org.apache.druid.data.input.SomeAvroDatum;
+import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.parsers.ParseException;
+import org.apache.druid.utils.DynamicConfigProviderUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -41,6 +45,7 @@ import org.mockito.Mockito;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Map;
public class SchemaRegistryBasedAvroBytesDecoderTest
{
@@ -56,7 +61,10 @@ public class SchemaRegistryBasedAvroBytesDecoderTest
public void testMultipleUrls() throws Exception
{
String json = "{\"urls\":[\"http://localhost\"],\"type\":
\"schema_registry\"}";
- ObjectMapper mapper = new ObjectMapper();
+ ObjectMapper mapper = new DefaultObjectMapper();
+ mapper.setInjectableValues(
+ new InjectableValues.Std().addValue(ObjectMapper.class, new
DefaultObjectMapper())
+ );
SchemaRegistryBasedAvroBytesDecoder decoder;
decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper
.readerFor(AvroBytesDecoder.class)
@@ -70,7 +78,10 @@ public class SchemaRegistryBasedAvroBytesDecoderTest
public void testUrl() throws Exception
{
String json = "{\"url\":\"http://localhost\",\"type\":
\"schema_registry\"}";
- ObjectMapper mapper = new ObjectMapper();
+ ObjectMapper mapper = new DefaultObjectMapper();
+ mapper.setInjectableValues(
+ new InjectableValues.Std().addValue(ObjectMapper.class, new
DefaultObjectMapper())
+ );
SchemaRegistryBasedAvroBytesDecoder decoder;
decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper
.readerFor(AvroBytesDecoder.class)
@@ -84,7 +95,10 @@ public class SchemaRegistryBasedAvroBytesDecoderTest
public void testConfig() throws Exception
{
String json = "{\"url\":\"http://localhost\",\"type\":
\"schema_registry\", \"config\":{}}";
- ObjectMapper mapper = new ObjectMapper();
+ ObjectMapper mapper = new DefaultObjectMapper();
+ mapper.setInjectableValues(
+ new InjectableValues.Std().addValue(ObjectMapper.class, new
DefaultObjectMapper())
+ );
SchemaRegistryBasedAvroBytesDecoder decoder;
decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper
.readerFor(AvroBytesDecoder.class)
@@ -163,4 +177,48 @@ public class SchemaRegistryBasedAvroBytesDecoderTest
writer.write(someAvroDatum, EncoderFactory.get().directBinaryEncoder(out,
null));
return out.toByteArray();
}
+
+ @Test
+ public void testParseHeader() throws JsonProcessingException
+ {
+ String json =
"{\"url\":\"http://localhost\",\"type\":\"schema_registry\",\"config\":{},\"headers\":{\"druid.dynamic.config.provider\":{\"type\":\"mapString\",
\"config\":{\"registry.header.prop.2\":\"value.2\",
\"registry.header.prop.3\":\"value.3\"}},\"registry.header.prop.1\":\"value.1\",\"registry.header.prop.2\":\"value.4\"}}";
+ ObjectMapper mapper = new DefaultObjectMapper();
+ mapper.setInjectableValues(
+ new InjectableValues.Std().addValue(ObjectMapper.class, new
DefaultObjectMapper())
+ );
+ SchemaRegistryBasedAvroBytesDecoder decoder;
+ decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper
+ .readerFor(AvroBytesDecoder.class)
+ .readValue(json);
+
+ Map<String, String> header =
DynamicConfigProviderUtils.extraConfigAndSetStringMap(decoder.getHeaders(),
SchemaRegistryBasedAvroBytesDecoder.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, new
DefaultObjectMapper());
+
+ // Then
+ Assert.assertEquals(3, header.size());
+ Assert.assertEquals("value.1", header.get("registry.header.prop.1"));
+ Assert.assertEquals("value.2", header.get("registry.header.prop.2"));
+ Assert.assertEquals("value.3", header.get("registry.header.prop.3"));
+ }
+
+ @Test
+ public void testParseConfig() throws JsonProcessingException
+ {
+ String json =
"{\"url\":\"http://localhost\",\"type\":\"schema_registry\",\"config\":{\"druid.dynamic.config.provider\":{\"type\":\"mapString\",
\"config\":{\"registry.config.prop.2\":\"value.2\",
\"registry.config.prop.3\":\"value.3\"}},\"registry.config.prop.1\":\"value.1\",\"registry.config.prop.2\":\"value.4\"},\"headers\":{}}";
+ ObjectMapper mapper = new DefaultObjectMapper();
+ mapper.setInjectableValues(
+ new InjectableValues.Std().addValue(ObjectMapper.class, new
DefaultObjectMapper())
+ );
+ SchemaRegistryBasedAvroBytesDecoder decoder;
+ decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper
+ .readerFor(AvroBytesDecoder.class)
+ .readValue(json);
+
+ Map<String, ?> config =
DynamicConfigProviderUtils.extraConfigAndSetStringMap(decoder.getConfig(),
SchemaRegistryBasedAvroBytesDecoder.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, new
DefaultObjectMapper());
+
+ // Then
+ Assert.assertEquals(3, config.size());
+ Assert.assertEquals("value.1", config.get("registry.config.prop.1"));
+ Assert.assertEquals("value.2", config.get("registry.config.prop.2"));
+ Assert.assertEquals("value.3", config.get("registry.config.prop.3"));
+ }
}
diff --git
a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java
b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java
index 2d4cc8d..17bb85a 100644
---
a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java
+++
b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java
@@ -19,8 +19,10 @@
package org.apache.druid.data.input.protobuf;
+import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
@@ -29,8 +31,10 @@ import
io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
+import org.apache.druid.guice.annotations.Json;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.ParseException;
+import org.apache.druid.utils.DynamicConfigProviderUtils;
import javax.annotation.Nullable;
@@ -50,16 +54,19 @@ public class SchemaRegistryBasedProtobufBytesDecoder
implements ProtobufBytesDec
private final String url;
private final int capacity;
private final List<String> urls;
- private final Map<String, ?> config;
- private final Map<String, String> headers;
+ private final Map<String, Object> config;
+ private final Map<String, Object> headers;
+ private final ObjectMapper jsonMapper;
+ public static final String DRUID_DYNAMIC_CONFIG_PROVIDER_KEY =
"druid.dynamic.config.provider";
@JsonCreator
public SchemaRegistryBasedProtobufBytesDecoder(
@JsonProperty("url") @Deprecated String url,
@JsonProperty("capacity") Integer capacity,
@JsonProperty("urls") @Nullable List<String> urls,
- @JsonProperty("config") @Nullable Map<String, ?> config,
- @JsonProperty("headers") @Nullable Map<String, String> headers
+ @JsonProperty("config") @Nullable Map<String, Object> config,
+ @JsonProperty("headers") @Nullable Map<String, Object> headers,
+ @JacksonInject @Json ObjectMapper jsonMapper
)
{
this.url = url;
@@ -67,10 +74,11 @@ public class SchemaRegistryBasedProtobufBytesDecoder
implements ProtobufBytesDec
this.urls = urls;
this.config = config;
this.headers = headers;
+ this.jsonMapper = jsonMapper;
if (url != null && !url.isEmpty()) {
- this.registry = new
CachedSchemaRegistryClient(Collections.singletonList(this.url), this.capacity,
Collections.singletonList(new ProtobufSchemaProvider()), this.config,
this.headers);
+ this.registry = new
CachedSchemaRegistryClient(Collections.singletonList(this.url), this.capacity,
Collections.singletonList(new ProtobufSchemaProvider()),
DynamicConfigProviderUtils.extraConfigAndSetObjectMap(config,
DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, this.jsonMapper),
DynamicConfigProviderUtils.extraConfigAndSetStringMap(headers,
DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, this.jsonMapper));
} else {
- this.registry = new CachedSchemaRegistryClient(this.urls, this.capacity,
Collections.singletonList(new ProtobufSchemaProvider()), this.config,
this.headers);
+ this.registry = new CachedSchemaRegistryClient(this.urls, this.capacity,
Collections.singletonList(new ProtobufSchemaProvider()),
DynamicConfigProviderUtils.extraConfigAndSetObjectMap(config,
DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, this.jsonMapper),
DynamicConfigProviderUtils.extraConfigAndSetStringMap(headers,
DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, this.jsonMapper));
}
}
@@ -93,13 +101,13 @@ public class SchemaRegistryBasedProtobufBytesDecoder
implements ProtobufBytesDec
}
@JsonProperty
- public Map<String, ?> getConfig()
+ public Map<String, Object> getConfig()
{
return config;
}
@JsonProperty
- public Map<String, String> getHeaders()
+ public Map<String, Object> getHeaders()
{
return headers;
}
@@ -119,6 +127,7 @@ public class SchemaRegistryBasedProtobufBytesDecoder
implements ProtobufBytesDec
this.config = null;
this.headers = null;
this.registry = registry;
+ this.jsonMapper = new ObjectMapper();
}
@Override
diff --git
a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java
b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java
index e9e15ff..0a3e8c7 100644
---
a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java
+++
b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java
@@ -19,6 +19,7 @@
package org.apache.druid.data.input.protobuf;
+import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
@@ -77,6 +78,9 @@ public class ProtobufInputFormatTest
for (Module jacksonModule : new
ProtobufExtensionsModule().getJacksonModules()) {
jsonMapper.registerModule(jacksonModule);
}
+ jsonMapper.setInjectableValues(
+ new InjectableValues.Std().addValue(ObjectMapper.class, new
DefaultObjectMapper())
+ );
}
@Test
@@ -99,7 +103,7 @@ public class ProtobufInputFormatTest
{
ProtobufInputFormat inputFormat = new ProtobufInputFormat(
flattenSpec,
- new SchemaRegistryBasedProtobufBytesDecoder("http://test:8081", 100,
null, null, null)
+ new SchemaRegistryBasedProtobufBytesDecoder("http://test:8081", 100,
null, null, null, null)
);
NestedInputFormat inputFormat2 = jsonMapper.readValue(
jsonMapper.writeValueAsString(inputFormat),
diff --git
a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoderTest.java
b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoderTest.java
index 0d77b11..009b5a6 100644
---
a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoderTest.java
+++
b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoderTest.java
@@ -19,6 +19,8 @@
package org.apache.druid.data.input.protobuf;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.DynamicMessage;
@@ -26,7 +28,9 @@ import
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import org.apache.commons.io.IOUtils;
+import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.parsers.ParseException;
+import org.apache.druid.utils.DynamicConfigProviderUtils;
import org.joda.time.DateTime;
import org.joda.time.chrono.ISOChronology;
import org.junit.Assert;
@@ -40,6 +44,7 @@ import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
+import java.util.Map;
public class SchemaRegistryBasedProtobufBytesDecoderTest
{
@@ -93,7 +98,7 @@ public class SchemaRegistryBasedProtobufBytesDecoderTest
public void testDefaultCapacity()
{
// Given
- SchemaRegistryBasedProtobufBytesDecoder
schemaRegistryBasedProtobufBytesDecoder = new
SchemaRegistryBasedProtobufBytesDecoder("http://test", null, null, null, null);
+ SchemaRegistryBasedProtobufBytesDecoder
schemaRegistryBasedProtobufBytesDecoder = new
SchemaRegistryBasedProtobufBytesDecoder("http://test", null, null, null, null,
null);
// When
Assert.assertEquals(schemaRegistryBasedProtobufBytesDecoder.getIdentityMapCapacity(),
Integer.MAX_VALUE);
}
@@ -103,7 +108,7 @@ public class SchemaRegistryBasedProtobufBytesDecoderTest
{
int capacity = 100;
// Given
- SchemaRegistryBasedProtobufBytesDecoder
schemaRegistryBasedProtobufBytesDecoder = new
SchemaRegistryBasedProtobufBytesDecoder("http://test", capacity, null, null,
null);
+ SchemaRegistryBasedProtobufBytesDecoder
schemaRegistryBasedProtobufBytesDecoder = new
SchemaRegistryBasedProtobufBytesDecoder("http://test", capacity, null, null,
null, null);
// When
Assert.assertEquals(schemaRegistryBasedProtobufBytesDecoder.getIdentityMapCapacity(),
capacity);
}
@@ -120,7 +125,10 @@ public class SchemaRegistryBasedProtobufBytesDecoderTest
public void testMultipleUrls() throws Exception
{
String json = "{\"urls\":[\"http://localhost\"],\"type\":
\"schema_registry\"}";
- ObjectMapper mapper = new ObjectMapper();
+ ObjectMapper mapper = new DefaultObjectMapper();
+ mapper.setInjectableValues(
+ new InjectableValues.Std().addValue(ObjectMapper.class, new
DefaultObjectMapper())
+ );
SchemaRegistryBasedProtobufBytesDecoder decoder;
decoder = (SchemaRegistryBasedProtobufBytesDecoder) mapper
.readerFor(ProtobufBytesDecoder.class)
@@ -134,7 +142,10 @@ public class SchemaRegistryBasedProtobufBytesDecoderTest
public void testUrl() throws Exception
{
String json = "{\"url\":\"http://localhost\",\"type\":
\"schema_registry\"}";
- ObjectMapper mapper = new ObjectMapper();
+ ObjectMapper mapper = new DefaultObjectMapper();
+ mapper.setInjectableValues(
+ new InjectableValues.Std().addValue(ObjectMapper.class, new
DefaultObjectMapper())
+ );
SchemaRegistryBasedProtobufBytesDecoder decoder;
decoder = (SchemaRegistryBasedProtobufBytesDecoder) mapper
.readerFor(ProtobufBytesDecoder.class)
@@ -148,7 +159,10 @@ public class SchemaRegistryBasedProtobufBytesDecoderTest
public void testConfig() throws Exception
{
String json = "{\"url\":\"http://localhost\",\"type\":
\"schema_registry\", \"config\":{}}";
- ObjectMapper mapper = new ObjectMapper();
+ ObjectMapper mapper = new DefaultObjectMapper();
+ mapper.setInjectableValues(
+ new InjectableValues.Std().addValue(ObjectMapper.class, new
DefaultObjectMapper())
+ );
SchemaRegistryBasedProtobufBytesDecoder decoder;
decoder = (SchemaRegistryBasedProtobufBytesDecoder) mapper
.readerFor(ProtobufBytesDecoder.class)
@@ -158,6 +172,50 @@ public class SchemaRegistryBasedProtobufBytesDecoderTest
Assert.assertNotEquals(decoder.hashCode(), 0);
}
+ @Test
+ public void testParseHeader() throws JsonProcessingException
+ {
+ String json =
"{\"url\":\"http://localhost\",\"type\":\"schema_registry\",\"config\":{},\"headers\":{\"druid.dynamic.config.provider\":{\"type\":\"mapString\",
\"config\":{\"registry.header.prop.2\":\"value.2\",
\"registry.header.prop.3\":\"value.3\"}},\"registry.header.prop.1\":\"value.1\",\"registry.header.prop.2\":\"value.4\"}}";
+ ObjectMapper mapper = new DefaultObjectMapper();
+ mapper.setInjectableValues(
+ new InjectableValues.Std().addValue(ObjectMapper.class, new
DefaultObjectMapper())
+ );
+ SchemaRegistryBasedProtobufBytesDecoder decoder;
+ decoder = (SchemaRegistryBasedProtobufBytesDecoder) mapper
+ .readerFor(ProtobufBytesDecoder.class)
+ .readValue(json);
+
+ Map<String, String> header =
DynamicConfigProviderUtils.extraConfigAndSetStringMap(decoder.getHeaders(),
SchemaRegistryBasedProtobufBytesDecoder.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, new
DefaultObjectMapper());
+
+ // Then
+ Assert.assertEquals(3, header.size());
+ Assert.assertEquals("value.1", header.get("registry.header.prop.1"));
+ Assert.assertEquals("value.2", header.get("registry.header.prop.2"));
+ Assert.assertEquals("value.3", header.get("registry.header.prop.3"));
+ }
+
+ @Test
+ public void testParseConfig() throws JsonProcessingException
+ {
+ String json =
"{\"url\":\"http://localhost\",\"type\":\"schema_registry\",\"config\":{\"druid.dynamic.config.provider\":{\"type\":\"mapString\",
\"config\":{\"registry.config.prop.2\":\"value.2\",
\"registry.config.prop.3\":\"value.3\"}},\"registry.config.prop.1\":\"value.1\",\"registry.config.prop.2\":\"value.4\"},\"headers\":{}}";
+ ObjectMapper mapper = new DefaultObjectMapper();
+ mapper.setInjectableValues(
+ new InjectableValues.Std().addValue(ObjectMapper.class, new
DefaultObjectMapper())
+ );
+ SchemaRegistryBasedProtobufBytesDecoder decoder;
+ decoder = (SchemaRegistryBasedProtobufBytesDecoder) mapper
+ .readerFor(ProtobufBytesDecoder.class)
+ .readValue(json);
+
+ Map<String, ?> heaeder =
DynamicConfigProviderUtils.extraConfigAndSetObjectMap(decoder.getConfig(),
SchemaRegistryBasedProtobufBytesDecoder.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, new
DefaultObjectMapper());
+
+ // Then
+ Assert.assertEquals(3, heaeder.size());
+ Assert.assertEquals("value.1", heaeder.get("registry.config.prop.1"));
+ Assert.assertEquals("value.2", heaeder.get("registry.config.prop.2"));
+ Assert.assertEquals("value.3", heaeder.get("registry.config.prop.3"));
+ }
+
private ProtobufSchema parseProtobufSchema() throws IOException
{
// Given
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]