Repository: kafka Updated Branches: refs/heads/trunk d5366471d -> 3cf2de069
KAFKA-3723: Cannot change size of schema cache for JSON converter Author: Christian Posta <[email protected]> Reviewers: Ewen Cheslack-Postava <[email protected]> Closes #1401 from christian-posta/ceposta-connect-class-cast-error Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3cf2de06 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3cf2de06 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3cf2de06 Branch: refs/heads/trunk Commit: 3cf2de0694cf0e276d25d8c7048a9928b41969a3 Parents: d536647 Author: Christian Posta <[email protected]> Authored: Thu May 26 14:13:54 2016 -0700 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Thu May 26 14:13:54 2016 -0700 ---------------------------------------------------------------------- .../apache/kafka/connect/json/JsonConverter.java | 2 +- .../kafka/connect/json/JsonConverterTest.java | 17 +++++++++++++++++ .../src/test/resources/connect-test.properties | 2 ++ .../runtime/distributed/DistributedHerderTest.java | 2 -- 4 files changed, 20 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/3cf2de06/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java ---------------------------------------------------------------------- diff --git a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java index d9a6859..59e653e 100644 --- a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java +++ b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java @@ -304,7 +304,7 @@ public class JsonConverter implements Converter { Object cacheSizeVal = configs.get(SCHEMAS_CACHE_SIZE_CONFIG); if (cacheSizeVal != null) - cacheSize = (int) cacheSizeVal; + cacheSize = Integer.parseInt((String) cacheSizeVal); fromConnectSchemaCache = new SynchronizedCache<>(new LRUCache<Schema, ObjectNode>(cacheSize)); toConnectSchemaCache = new SynchronizedCache<>(new LRUCache<JsonNode, Schema>(cacheSize)); } http://git-wip-us.apache.org/repos/asf/kafka/blob/3cf2de06/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java ---------------------------------------------------------------------- diff --git a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java index c923285..0e7c153 100644 --- a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java +++ b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.data.Date; import org.apache.kafka.connect.data.Decimal; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -36,10 +37,13 @@ import org.junit.Before; import org.junit.Test; import org.powermock.reflect.Whitebox; +import java.io.File; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.math.BigDecimal; import java.math.BigInteger; +import java.net.URISyntaxException; +import java.net.URL; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Calendar; @@ -607,6 +611,19 @@ public class JsonConverterTest { assertEquals(2, cache.size()); } + @Test + public void testJsonSchemaCacheSizeFromConfigFile() throws URISyntaxException, IOException { + URL url = getClass().getResource("/connect-test.properties"); + File propFile = new File(url.toURI()); + String workerPropsFile = propFile.getAbsolutePath(); + Map<String, String> workerProps = !workerPropsFile.isEmpty() ? + Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.<String, String>emptyMap(); + + JsonConverter rc = new JsonConverter(); + rc.configure(workerProps, false); + + } + private JsonNode parse(byte[] json) { try { http://git-wip-us.apache.org/repos/asf/kafka/blob/3cf2de06/connect/json/src/test/resources/connect-test.properties ---------------------------------------------------------------------- diff --git a/connect/json/src/test/resources/connect-test.properties b/connect/json/src/test/resources/connect-test.properties new file mode 100644 index 0000000..5b7e788 --- /dev/null +++ b/connect/json/src/test/resources/connect-test.properties @@ -0,0 +1,2 @@ +schemas.cache.size=1 + http://git-wip-us.apache.org/repos/asf/kafka/blob/3cf2de06/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java index 81e6be8..747db1a 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java @@ -81,9 +81,7 @@ public class DistributedHerderTest { HERDER_CONFIG.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); HERDER_CONFIG.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); HERDER_CONFIG.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); - HERDER_CONFIG.put(DistributedConfig.CONFIG_TOPIC_CONFIG, "connect-configs"); HERDER_CONFIG.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "connect-offsets"); - HERDER_CONFIG.put(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, "status-topic"); } private static final String MEMBER_URL = "memberUrl";
