[ https://issues.apache.org/jira/browse/KAFKA-5540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16494443#comment-16494443 ]
ASF GitHub Bot commented on KAFKA-5540: --------------------------------------- ewencp closed pull request #4693: KAFKA-5540: Deprecate internal converter configs URL: https://github.com/apache/kafka/pull/4693 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/build.gradle b/build.gradle index 31026d91e1e..78c94b9bfa3 100644 --- a/build.gradle +++ b/build.gradle @@ -1327,12 +1327,14 @@ project(':connect:runtime') { archivesBaseName = "connect-runtime" dependencies { + compile project(':connect:api') - compile project(":connect:transforms") compile project(':clients') compile project(':tools') - compile libs.slf4jApi + compile project(':connect:json') + compile project(':connect:transforms') + compile libs.slf4jApi compile libs.jacksonJaxrsJsonProvider compile libs.jerseyContainerServlet compile libs.jaxbApi // Jersey dependency that was available in the JDK before Java 9 @@ -1350,7 +1352,6 @@ project(':connect:runtime') { testCompile libs.powermockJunit4 testCompile libs.powermockEasymock - testCompile project(":connect:json") testCompile project(':clients').sourceSets.test.output testRuntime libs.slf4jlog4j diff --git a/config/connect-distributed.properties b/config/connect-distributed.properties index 5f3f35802cf..72db145f3f8 100644 --- a/config/connect-distributed.properties +++ b/config/connect-distributed.properties @@ -34,13 +34,6 @@ value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true -# The internal converter used for offsets, config, and status data is configurable and must be specified, but most users will -# always want to use the built-in default. Offset, config, and status data is never visible outside of Kafka Connect in this format. -internal.key.converter=org.apache.kafka.connect.json.JsonConverter -internal.value.converter=org.apache.kafka.connect.json.JsonConverter -internal.key.converter.schemas.enable=false -internal.value.converter.schemas.enable=false - # Topic to use for storing offsets. This topic should have many partitions and be replicated and compacted. # Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create # the topic before starting Kafka Connect if a specific topic configuration is needed. diff --git a/config/connect-standalone.properties b/config/connect-standalone.properties index 0039796ddcc..a340a3bf315 100644 --- a/config/connect-standalone.properties +++ b/config/connect-standalone.properties @@ -25,13 +25,6 @@ value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true -# The internal converter used for offsets and config data is configurable and must be specified, but most users will -# always want to use the built-in default. Offset and config data is never visible outside of Kafka Connect in this format. -internal.key.converter=org.apache.kafka.connect.json.JsonConverter -internal.value.converter=org.apache.kafka.connect.json.JsonConverter -internal.key.converter.schemas.enable=false -internal.value.converter.schemas.enable=false - offset.storage.file.filename=/tmp/connect.offsets # Flush much faster than normal, which is useful for testing/debugging offset.flush.interval.ms=10000 diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java index b857b0e0565..c9c32e7f066 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java @@ -23,7 +23,12 @@ import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.connect.json.JsonConverter; +import org.apache.kafka.connect.json.JsonConverterConfig; +import org.apache.kafka.connect.storage.Converter; import org.apache.kafka.connect.storage.SimpleHeaderConverter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Arrays; @@ -37,6 +42,7 @@ * Common base class providing configuration for Kafka Connect workers, whether standalone or distributed. */ public class WorkerConfig extends AbstractConfig { + private static final Logger log = LoggerFactory.getLogger(WorkerConfig.class); public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers"; public static final String BOOTSTRAP_SERVERS_DOC @@ -73,6 +79,10 @@ " header values to strings and deserialize them by inferring the schemas."; public static final String HEADER_CONVERTER_CLASS_DEFAULT = SimpleHeaderConverter.class.getName(); + /** + * @deprecated As of 2.0.0 + */ + @Deprecated public static final String INTERNAL_KEY_CONVERTER_CLASS_CONFIG = "internal.key.converter"; public static final String INTERNAL_KEY_CONVERTER_CLASS_DOC = "Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka." + @@ -80,8 +90,13 @@ " independent of connectors it allows any connector to work with any serialization format." + " Examples of common formats include JSON and Avro." + " This setting controls the format used for internal bookkeeping data used by the framework, such as" + - " configs and offsets, so users can typically use any functioning Converter implementation."; + " configs and offsets, so users can typically use any functioning Converter implementation." + + " Deprecated; will be removed in an upcoming version."; + /** + * @deprecated As of 2.0.0 + */ + @Deprecated public static final String INTERNAL_VALUE_CONVERTER_CLASS_CONFIG = "internal.value.converter"; public static final String INTERNAL_VALUE_CONVERTER_CLASS_DOC = "Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka." + @@ -89,7 +104,10 @@ " independent of connectors it allows any connector to work with any serialization format." + " Examples of common formats include JSON and Avro." + " This setting controls the format used for internal bookkeeping data used by the framework, such as" + - " configs and offsets, so users can typically use any functioning Converter implementation."; + " configs and offsets, so users can typically use any functioning Converter implementation." + + " Deprecated; will be removed in an upcoming version."; + + private static final Class<? extends Converter> INTERNAL_CONVERTER_DEFAULT = JsonConverter.class; public static final String TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG = "task.shutdown.graceful.timeout.ms"; @@ -190,9 +208,9 @@ protected static ConfigDef baseConfigDef() { Importance.HIGH, KEY_CONVERTER_CLASS_DOC) .define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, VALUE_CONVERTER_CLASS_DOC) - .define(INTERNAL_KEY_CONVERTER_CLASS_CONFIG, Type.CLASS, + .define(INTERNAL_KEY_CONVERTER_CLASS_CONFIG, Type.CLASS, INTERNAL_CONVERTER_DEFAULT, Importance.LOW, INTERNAL_KEY_CONVERTER_CLASS_DOC) - .define(INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS, + .define(INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS, INTERNAL_CONVERTER_DEFAULT, Importance.LOW, INTERNAL_VALUE_CONVERTER_CLASS_DOC) .define(TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG, Type.LONG, TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DEFAULT, Importance.LOW, @@ -239,6 +257,58 @@ protected static ConfigDef baseConfigDef() { Importance.LOW, HEADER_CONVERTER_CLASS_DOC); } + private void logInternalConverterDeprecationWarnings(Map<String, String> props) { + String[] deprecatedConfigs = new String[] { + INTERNAL_KEY_CONVERTER_CLASS_CONFIG, + INTERNAL_VALUE_CONVERTER_CLASS_CONFIG + }; + for (String config : deprecatedConfigs) { + if (props.containsKey(config)) { + Class<?> internalConverterClass = getClass(config); + logDeprecatedProperty(config, internalConverterClass.getCanonicalName(), INTERNAL_CONVERTER_DEFAULT.getCanonicalName(), null); + if (internalConverterClass.equals(INTERNAL_CONVERTER_DEFAULT)) { + // log the properties for this converter ... + for (Map.Entry<String, Object> propEntry : originalsWithPrefix(config + ".").entrySet()) { + String prop = propEntry.getKey(); + String propValue = propEntry.getValue().toString(); + String defaultValue = JsonConverterConfig.SCHEMAS_ENABLE_CONFIG.equals(prop) ? "false" : null; + logDeprecatedProperty(config + "." + prop, propValue, defaultValue, config); + } + } + } + } + } + + private void logDeprecatedProperty(String propName, String propValue, String defaultValue, String prefix) { + String prefixNotice = prefix != null + ? " (along with all configuration for '" + prefix + "')" + : ""; + if (defaultValue != null && defaultValue.equalsIgnoreCase(propValue)) { + log.info( + "Worker configuration property '{}'{} is deprecated and may be removed in an upcoming release. " + + "The specified value matches the default, so this property can be safely removed from the worker configuration.", + propName, + prefixNotice, + propValue + ); + } else if (defaultValue != null) { + log.warn( + "Worker configuration property '{}'{} is deprecated and may be removed in an upcoming release. " + + "The specified value '{}' does NOT match the default and recommended value '{}'.", + propName, + prefixNotice, + propValue, + defaultValue + ); + } else { + log.warn( + "Worker configuration property '{}'{} is deprecated and may be removed in an upcoming release.", + propName, + prefixNotice + ); + } + } + @Override protected Map<String, Object> postProcessParsedConfig(final Map<String, Object> parsedValues) { return CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues); @@ -253,5 +323,6 @@ protected static ConfigDef baseConfigDef() { public WorkerConfig(ConfigDef definition, Map<String, String> props) { super(definition, props); + logInternalConverterDeprecationWarnings(props); } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java index f4cd2ba14b0..30c41cd7d17 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java @@ -23,6 +23,8 @@ import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.json.JsonConverter; +import org.apache.kafka.connect.json.JsonConverterConfig; import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.storage.Converter; import org.apache.kafka.connect.storage.ConverterConfig; @@ -97,6 +99,11 @@ public Object run() { ); } + protected static boolean isInternalConverter(String classPropertyName) { + return classPropertyName.equals(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG) + || classPropertyName.equals(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG); + } + public static ClassLoader compareAndSwapLoaders(ClassLoader loader) { ClassLoader current = Thread.currentThread().getContextClassLoader(); if (!current.equals(loader)) { @@ -195,8 +202,9 @@ public Task newTask(Class<? extends Task> taskClass) { * @throws ConnectException if the {@link Converter} implementation class could not be found */ public Converter newConverter(AbstractConfig config, String classPropertyName, ClassLoaderUsage classLoaderUsage) { - if (!config.originals().containsKey(classPropertyName)) { - // This configuration does not define the converter via the specified property name + if (!config.originals().containsKey(classPropertyName) && !isInternalConverter(classPropertyName)) { + // This configuration does not define the converter via the specified property name, and + // it does not represent an internal converter (which has a default available) return null; } Converter plugin = null; @@ -236,6 +244,18 @@ public Converter newConverter(AbstractConfig config, String classPropertyName, C Map<String, Object> converterConfig = config.originalsWithPrefix(configPrefix); log.debug("Configuring the {} converter with configuration:{}{}", isKeyConverter ? "key" : "value", System.lineSeparator(), converterConfig); + + // Have to override schemas.enable from true to false for internal JSON converters + // Don't have to warn the user about anything since all deprecation warnings take place in the + // WorkerConfig class + if (plugin instanceof JsonConverter && isInternalConverter(classPropertyName)) { + // If they haven't explicitly specified values for internal.key.converter.schemas.enable + // or internal.value.converter.schemas.enable, we can safely default them to false + if (!converterConfig.containsKey(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG)) { + converterConfig.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, false); + } + } + plugin.configure(converterConfig, isKeyConverter); return plugin; } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java index a9a944fa360..877fe6b600f 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.json.JsonConverter; import org.apache.kafka.connect.json.JsonConverterConfig; import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.runtime.isolation.Plugins.ClassLoaderUsage; @@ -51,6 +52,7 @@ private AbstractConfig config; private TestConverter converter; private TestHeaderConverter headerConverter; + private TestInternalConverter internalConverter; @BeforeClass public static void beforeAll() { @@ -71,10 +73,8 @@ public void setup() { props.put("value.converter." + JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "true"); props.put("key.converter.extra.config", "foo1"); props.put("value.converter.extra.config", "foo2"); - props.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, TestConverter.class.getName()); - props.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, TestConverter.class.getName()); - props.put("internal.key.converter." + JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false"); - props.put("internal.value.converter." + JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false"); + props.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, TestInternalConverter.class.getName()); + props.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, TestInternalConverter.class.getName()); props.put("internal.key.converter.extra.config", "bar1"); props.put("internal.value.converter.extra.config", "bar2"); props.put(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, TestHeaderConverter.class.getName()); @@ -102,15 +102,17 @@ public void shouldInstantiateAndConfigureConverters() { @Test public void shouldInstantiateAndConfigureInternalConverters() { - instantiateAndConfigureConverter(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.CURRENT_CLASSLOADER); - // Validate extra configs got passed through to overridden converters - assertEquals("false", converter.configs.get(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG)); - assertEquals("bar1", converter.configs.get("extra.config")); - - instantiateAndConfigureConverter(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS); - // Validate extra configs got passed through to overridden converters - assertEquals("false", converter.configs.get(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG)); - assertEquals("bar2", converter.configs.get("extra.config")); + instantiateAndConfigureInternalConverter(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.CURRENT_CLASSLOADER); + // Validate schemas.enable is defaulted to false for internal converter + assertEquals(false, internalConverter.configs.get(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG)); + // Validate internal converter properties can still be set + assertEquals("bar1", internalConverter.configs.get("extra.config")); + + instantiateAndConfigureInternalConverter(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS); + // Validate schemas.enable is defaulted to false for internal converter + assertEquals(false, internalConverter.configs.get(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG)); + // Validate internal converter properties can still be set + assertEquals("bar2", internalConverter.configs.get("extra.config")); } @Test @@ -163,6 +165,16 @@ protected void instantiateAndConfigureConverter(String configPropName, ClassLoad assertNotNull(converter); } + protected void instantiateAndConfigureHeaderConverter(String configPropName) { + headerConverter = (TestHeaderConverter) plugins.newHeaderConverter(config, configPropName, ClassLoaderUsage.CURRENT_CLASSLOADER); + assertNotNull(headerConverter); + } + + protected void instantiateAndConfigureInternalConverter(String configPropName, ClassLoaderUsage classLoaderUsage) { + internalConverter = (TestInternalConverter) plugins.newConverter(config, configPropName, classLoaderUsage); + assertNotNull(internalConverter); + } + protected void assertConverterType(ConverterType type, Map<String, ?> props) { assertEquals(type.getName(), props.get(ConverterConfig.TYPE_CONFIG)); } @@ -230,4 +242,13 @@ public SchemaAndValue toConnectHeader(String topic, String headerKey, byte[] val public void close() throws IOException { } } -} \ No newline at end of file + + public static class TestInternalConverter extends JsonConverter { + public Map<String, ?> configs; + + public void configure(Map<String, ?> configs) { + this.configs = configs; + super.configure(configs); + } + } +} diff --git a/docs/upgrade.html b/docs/upgrade.html index 451f1038645..601ec6951b1 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -77,6 +77,15 @@ <h5><a id="upgrade_200_notable" href="#upgrade_200_notable">Notable changes in 2 <li>New Kafka Streams configuration parameter <code>upgrade.from</code> added that allows rolling bounce upgrade from older version. </li> <li><a href="https://cwiki.apache.org/confluence/x/DVyHB">KIP-284</a> changed the retention time for Kafka Streams repartition topics by setting its default value to <code>Long.MAX_VALUE</code>.</li> <li>Updated <code>ProcessorStateManager</code> APIs in Kafka Streams for registering state stores to the processor topology. For more details please read the Streams <a href="/{{version}}/documentation/streams/upgrade-guide#streams_api_changes_200">Upgrade Guide</a>.</li> + <li> + In earlier releases, Connect's worker configuration required the <code>internal.key.converter</code> and <code>internal.value.converter</code> properties. + In 2.0, these are <a href="https://cwiki.apache.org/confluence/x/AZQ7B">no longer required</a> and default to the JSON converter. + You may safely remove these properties from your Connect standalone and distributed worker configurations:<br /> + <code>internal.key.converter=org.apache.kafka.connect.json.JsonConverter</code> + <code>internal.key.converter.schemas.enable=false</code> + <code>internal.value.converter=org.apache.kafka.connect.json.JsonConverter</code> + <code>internal.value.converter.schemas.enable=false</code> + </li> </ul> <h5><a id="upgrade_200_new_protocols" href="#upgrade_200_new_protocols">New Protocol Versions</a></h5> diff --git a/tests/kafkatest/tests/connect/templates/connect-distributed.properties b/tests/kafkatest/tests/connect/templates/connect-distributed.properties index a1d3de29d51..186773e7d1a 100644 --- a/tests/kafkatest/tests/connect/templates/connect-distributed.properties +++ b/tests/kafkatest/tests/connect/templates/connect-distributed.properties @@ -29,11 +29,6 @@ key.converter.schemas.enable={{ schemas|default(True)|string|lower }} value.converter.schemas.enable={{ schemas|default(True)|string|lower }} {% endif %} -internal.key.converter=org.apache.kafka.connect.json.JsonConverter -internal.value.converter=org.apache.kafka.connect.json.JsonConverter -internal.key.converter.schemas.enable=false -internal.value.converter.schemas.enable=false - offset.storage.topic={{ OFFSETS_TOPIC }} offset.storage.replication.factor={{ OFFSETS_REPLICATION_FACTOR }} offset.storage.partitions={{ OFFSETS_PARTITIONS }} diff --git a/tests/kafkatest/tests/connect/templates/connect-standalone.properties b/tests/kafkatest/tests/connect/templates/connect-standalone.properties index 5f079f7a396..a8eaa44832e 100644 --- a/tests/kafkatest/tests/connect/templates/connect-standalone.properties +++ b/tests/kafkatest/tests/connect/templates/connect-standalone.properties @@ -27,11 +27,6 @@ key.converter.schemas.enable={{ schemas|default(True)|string|lower }} value.converter.schemas.enable={{ schemas|default(True)|string|lower }} {% endif %} -internal.key.converter=org.apache.kafka.connect.json.JsonConverter -internal.value.converter=org.apache.kafka.connect.json.JsonConverter -internal.key.converter.schemas.enable=false -internal.value.converter.schemas.enable=false - offset.storage.file.filename={{ OFFSETS_FILE }} # Reduce the admin client request timeouts so that we don't wait the default 120 sec before failing to connect the admin client ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Deprecate and remove internal converter configs > ----------------------------------------------- > > Key: KAFKA-5540 > URL: https://issues.apache.org/jira/browse/KAFKA-5540 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect > Affects Versions: 0.11.0.0 > Reporter: Ewen Cheslack-Postava > Assignee: Chris Egerton > Priority: Major > Labels: needs-kip > Fix For: 2.0.0 > > > The internal.key.converter and internal.value.converter were original exposed > as configs because a) they are actually pluggable and b) providing a default > would require relying on the JsonConverter always being available, which > until we had classloader isolation it was possible might be removed for > compatibility reasons. > However, this has ultimately just caused a lot more trouble and confusion > than it is worth. We should deprecate the configs, give them a default of > JsonConverter (which is also kind of nice since it results in human-readable > data in the internal topics), and then ultimately remove them in the next > major version. > These are all public APIs so this will need a small KIP before we can make > the change. -- This message was sent by Atlassian JIRA (v7.6.3#76005)