[ 
https://issues.apache.org/jira/browse/KAFKA-5540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16494443#comment-16494443
 ] 

ASF GitHub Bot commented on KAFKA-5540:
---------------------------------------

ewencp closed pull request #4693: KAFKA-5540: Deprecate internal converter 
configs
URL: https://github.com/apache/kafka/pull/4693
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/build.gradle b/build.gradle
index 31026d91e1e..78c94b9bfa3 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1327,12 +1327,14 @@ project(':connect:runtime') {
   archivesBaseName = "connect-runtime"
 
   dependencies {
+
     compile project(':connect:api')
-    compile project(":connect:transforms")
     compile project(':clients')
     compile project(':tools')
-    compile libs.slf4jApi
+    compile project(':connect:json')
+    compile project(':connect:transforms')
 
+    compile libs.slf4jApi
     compile libs.jacksonJaxrsJsonProvider
     compile libs.jerseyContainerServlet
     compile libs.jaxbApi // Jersey dependency that was available in the JDK 
before Java 9
@@ -1350,7 +1352,6 @@ project(':connect:runtime') {
     testCompile libs.powermockJunit4
     testCompile libs.powermockEasymock
 
-    testCompile project(":connect:json")
     testCompile project(':clients').sourceSets.test.output
 
     testRuntime libs.slf4jlog4j
diff --git a/config/connect-distributed.properties 
b/config/connect-distributed.properties
index 5f3f35802cf..72db145f3f8 100644
--- a/config/connect-distributed.properties
+++ b/config/connect-distributed.properties
@@ -34,13 +34,6 @@ value.converter=org.apache.kafka.connect.json.JsonConverter
 key.converter.schemas.enable=true
 value.converter.schemas.enable=true
 
-# The internal converter used for offsets, config, and status data is 
configurable and must be specified, but most users will
-# always want to use the built-in default. Offset, config, and status data is 
never visible outside of Kafka Connect in this format.
-internal.key.converter=org.apache.kafka.connect.json.JsonConverter
-internal.value.converter=org.apache.kafka.connect.json.JsonConverter
-internal.key.converter.schemas.enable=false
-internal.value.converter.schemas.enable=false
-
 # Topic to use for storing offsets. This topic should have many partitions and 
be replicated and compacted.
 # Kafka Connect will attempt to create the topic automatically when needed, 
but you can always manually create
 # the topic before starting Kafka Connect if a specific topic configuration is 
needed.
diff --git a/config/connect-standalone.properties 
b/config/connect-standalone.properties
index 0039796ddcc..a340a3bf315 100644
--- a/config/connect-standalone.properties
+++ b/config/connect-standalone.properties
@@ -25,13 +25,6 @@ value.converter=org.apache.kafka.connect.json.JsonConverter
 key.converter.schemas.enable=true
 value.converter.schemas.enable=true
 
-# The internal converter used for offsets and config data is configurable and 
must be specified, but most users will
-# always want to use the built-in default. Offset and config data is never 
visible outside of Kafka Connect in this format.
-internal.key.converter=org.apache.kafka.connect.json.JsonConverter
-internal.value.converter=org.apache.kafka.connect.json.JsonConverter
-internal.key.converter.schemas.enable=false
-internal.value.converter.schemas.enable=false
-
 offset.storage.file.filename=/tmp/connect.offsets
 # Flush much faster than normal, which is useful for testing/debugging
 offset.flush.interval.ms=10000
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
index b857b0e0565..c9c32e7f066 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
@@ -23,7 +23,12 @@
 import org.apache.kafka.common.config.ConfigDef.Type;
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
 import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.connect.json.JsonConverter;
+import org.apache.kafka.connect.json.JsonConverterConfig;
+import org.apache.kafka.connect.storage.Converter;
 import org.apache.kafka.connect.storage.SimpleHeaderConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -37,6 +42,7 @@
  * Common base class providing configuration for Kafka Connect workers, 
whether standalone or distributed.
  */
 public class WorkerConfig extends AbstractConfig {
+    private static final Logger log = 
LoggerFactory.getLogger(WorkerConfig.class);
 
     public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
     public static final String BOOTSTRAP_SERVERS_DOC
@@ -73,6 +79,10 @@
                     " header values to strings and deserialize them by 
inferring the schemas.";
     public static final String HEADER_CONVERTER_CLASS_DEFAULT = 
SimpleHeaderConverter.class.getName();
 
+    /**
+     * @deprecated As of 2.0.0
+     */
+    @Deprecated
     public static final String INTERNAL_KEY_CONVERTER_CLASS_CONFIG = 
"internal.key.converter";
     public static final String INTERNAL_KEY_CONVERTER_CLASS_DOC =
             "Converter class used to convert between Kafka Connect format and 
the serialized form that is written to Kafka." +
@@ -80,8 +90,13 @@
                     " independent of connectors it allows any connector to 
work with any serialization format." +
                     " Examples of common formats include JSON and Avro." +
                     " This setting controls the format used for internal 
bookkeeping data used by the framework, such as" +
-                    " configs and offsets, so users can typically use any 
functioning Converter implementation.";
+                    " configs and offsets, so users can typically use any 
functioning Converter implementation." +
+                    " Deprecated; will be removed in an upcoming version.";
 
+    /**
+     * @deprecated As of 2.0.0
+     */
+    @Deprecated
     public static final String INTERNAL_VALUE_CONVERTER_CLASS_CONFIG = 
"internal.value.converter";
     public static final String INTERNAL_VALUE_CONVERTER_CLASS_DOC =
             "Converter class used to convert between Kafka Connect format and 
the serialized form that is written to Kafka." +
@@ -89,7 +104,10 @@
                     " independent of connectors it allows any connector to 
work with any serialization format." +
                     " Examples of common formats include JSON and Avro." +
                     " This setting controls the format used for internal 
bookkeeping data used by the framework, such as" +
-                    " configs and offsets, so users can typically use any 
functioning Converter implementation.";
+                    " configs and offsets, so users can typically use any 
functioning Converter implementation." +
+                    " Deprecated; will be removed in an upcoming version.";
+
+    private static final Class<? extends Converter> INTERNAL_CONVERTER_DEFAULT 
= JsonConverter.class;
 
     public static final String TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG
             = "task.shutdown.graceful.timeout.ms";
@@ -190,9 +208,9 @@ protected static ConfigDef baseConfigDef() {
                         Importance.HIGH, KEY_CONVERTER_CLASS_DOC)
                 .define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS,
                         Importance.HIGH, VALUE_CONVERTER_CLASS_DOC)
-                .define(INTERNAL_KEY_CONVERTER_CLASS_CONFIG, Type.CLASS,
+                .define(INTERNAL_KEY_CONVERTER_CLASS_CONFIG, Type.CLASS, 
INTERNAL_CONVERTER_DEFAULT,
                         Importance.LOW, INTERNAL_KEY_CONVERTER_CLASS_DOC)
-                .define(INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS,
+                .define(INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS, 
INTERNAL_CONVERTER_DEFAULT,
                         Importance.LOW, INTERNAL_VALUE_CONVERTER_CLASS_DOC)
                 .define(TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG, Type.LONG,
                         TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DEFAULT, 
Importance.LOW,
@@ -239,6 +257,58 @@ protected static ConfigDef baseConfigDef() {
                         Importance.LOW, HEADER_CONVERTER_CLASS_DOC);
     }
 
+    private void logInternalConverterDeprecationWarnings(Map<String, String> 
props) {
+        String[] deprecatedConfigs = new String[] {
+            INTERNAL_KEY_CONVERTER_CLASS_CONFIG,
+            INTERNAL_VALUE_CONVERTER_CLASS_CONFIG
+        };
+        for (String config : deprecatedConfigs) {
+            if (props.containsKey(config)) {
+                Class<?> internalConverterClass = getClass(config);
+                logDeprecatedProperty(config, 
internalConverterClass.getCanonicalName(), 
INTERNAL_CONVERTER_DEFAULT.getCanonicalName(), null);
+                if (internalConverterClass.equals(INTERNAL_CONVERTER_DEFAULT)) 
{
+                    // log the properties for this converter ...
+                    for (Map.Entry<String, Object> propEntry : 
originalsWithPrefix(config + ".").entrySet()) {
+                        String prop = propEntry.getKey();
+                        String propValue = propEntry.getValue().toString();
+                        String defaultValue = 
JsonConverterConfig.SCHEMAS_ENABLE_CONFIG.equals(prop) ? "false" : null;
+                        logDeprecatedProperty(config + "." + prop, propValue, 
defaultValue, config);
+                    }
+                }
+            }
+        }
+    }
+
+    private void logDeprecatedProperty(String propName, String propValue, 
String defaultValue, String prefix) {
+        String prefixNotice = prefix != null
+            ? " (along with all configuration for '" + prefix + "')"
+            : "";
+        if (defaultValue != null && defaultValue.equalsIgnoreCase(propValue)) {
+            log.info(
+                "Worker configuration property '{}'{} is deprecated and may be 
removed in an upcoming release. "
+                    + "The specified value matches the default, so this 
property can be safely removed from the worker configuration.",
+                propName,
+                prefixNotice,
+                propValue
+            );
+        } else if (defaultValue != null) {
+            log.warn(
+                "Worker configuration property '{}'{} is deprecated and may be 
removed in an upcoming release. "
+                    + "The specified value '{}' does NOT match the default and 
recommended value '{}'.",
+                propName,
+                prefixNotice,
+                propValue,
+                defaultValue
+            );
+        } else {
+            log.warn(
+                "Worker configuration property '{}'{} is deprecated and may be 
removed in an upcoming release.",
+                propName,
+                prefixNotice
+            );
+        }
+    }
+
     @Override
     protected Map<String, Object> postProcessParsedConfig(final Map<String, 
Object> parsedValues) {
         return CommonClientConfigs.postProcessReconnectBackoffConfigs(this, 
parsedValues);
@@ -253,5 +323,6 @@ protected static ConfigDef baseConfigDef() {
 
     public WorkerConfig(ConfigDef definition, Map<String, String> props) {
         super(definition, props);
+        logInternalConverterDeprecationWarnings(props);
     }
 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
index f4cd2ba14b0..30c41cd7d17 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
@@ -23,6 +23,8 @@
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.json.JsonConverter;
+import org.apache.kafka.connect.json.JsonConverterConfig;
 import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.storage.Converter;
 import org.apache.kafka.connect.storage.ConverterConfig;
@@ -97,6 +99,11 @@ public Object run() {
         );
     }
 
+    protected static boolean isInternalConverter(String classPropertyName) {
+        return 
classPropertyName.equals(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG)
+            || 
classPropertyName.equals(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG);
+    }
+
     public static ClassLoader compareAndSwapLoaders(ClassLoader loader) {
         ClassLoader current = Thread.currentThread().getContextClassLoader();
         if (!current.equals(loader)) {
@@ -195,8 +202,9 @@ public Task newTask(Class<? extends Task> taskClass) {
      * @throws ConnectException if the {@link Converter} implementation class 
could not be found
      */
     public Converter newConverter(AbstractConfig config, String 
classPropertyName, ClassLoaderUsage classLoaderUsage) {
-        if (!config.originals().containsKey(classPropertyName)) {
-            // This configuration does not define the converter via the 
specified property name
+        if (!config.originals().containsKey(classPropertyName) && 
!isInternalConverter(classPropertyName)) {
+            // This configuration does not define the converter via the 
specified property name, and
+            // it does not represent an internal converter (which has a 
default available)
             return null;
         }
         Converter plugin = null;
@@ -236,6 +244,18 @@ public Converter newConverter(AbstractConfig config, 
String classPropertyName, C
         Map<String, Object> converterConfig = 
config.originalsWithPrefix(configPrefix);
         log.debug("Configuring the {} converter with configuration:{}{}",
                   isKeyConverter ? "key" : "value", System.lineSeparator(), 
converterConfig);
+
+        // Have to override schemas.enable from true to false for internal 
JSON converters
+        // Don't have to warn the user about anything since all deprecation 
warnings take place in the
+        // WorkerConfig class
+        if (plugin instanceof JsonConverter && 
isInternalConverter(classPropertyName)) {
+            // If they haven't explicitly specified values for 
internal.key.converter.schemas.enable
+            // or internal.value.converter.schemas.enable, we can safely 
default them to false
+            if 
(!converterConfig.containsKey(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG)) {
+                converterConfig.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, 
false);
+            }
+        }
+
         plugin.configure(converterConfig, isKeyConverter);
         return plugin;
     }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
index a9a944fa360..877fe6b600f 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
@@ -22,6 +22,7 @@
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.json.JsonConverter;
 import org.apache.kafka.connect.json.JsonConverterConfig;
 import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.runtime.isolation.Plugins.ClassLoaderUsage;
@@ -51,6 +52,7 @@
     private AbstractConfig config;
     private TestConverter converter;
     private TestHeaderConverter headerConverter;
+    private TestInternalConverter internalConverter;
 
     @BeforeClass
     public static void beforeAll() {
@@ -71,10 +73,8 @@ public void setup() {
         props.put("value.converter." + 
JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "true");
         props.put("key.converter.extra.config", "foo1");
         props.put("value.converter.extra.config", "foo2");
-        props.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, 
TestConverter.class.getName());
-        props.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, 
TestConverter.class.getName());
-        props.put("internal.key.converter." + 
JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false");
-        props.put("internal.value.converter." + 
JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false");
+        props.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, 
TestInternalConverter.class.getName());
+        props.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, 
TestInternalConverter.class.getName());
         props.put("internal.key.converter.extra.config", "bar1");
         props.put("internal.value.converter.extra.config", "bar2");
         props.put(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, 
TestHeaderConverter.class.getName());
@@ -102,15 +102,17 @@ public void shouldInstantiateAndConfigureConverters() {
 
     @Test
     public void shouldInstantiateAndConfigureInternalConverters() {
-        
instantiateAndConfigureConverter(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG,
 ClassLoaderUsage.CURRENT_CLASSLOADER);
-        // Validate extra configs got passed through to overridden converters
-        assertEquals("false", 
converter.configs.get(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG));
-        assertEquals("bar1", converter.configs.get("extra.config"));
-
-        
instantiateAndConfigureConverter(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG,
 ClassLoaderUsage.PLUGINS);
-        // Validate extra configs got passed through to overridden converters
-        assertEquals("false", 
converter.configs.get(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG));
-        assertEquals("bar2", converter.configs.get("extra.config"));
+        
instantiateAndConfigureInternalConverter(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG,
 ClassLoaderUsage.CURRENT_CLASSLOADER);
+        // Validate schemas.enable is defaulted to false for internal converter
+        assertEquals(false, 
internalConverter.configs.get(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG));
+        // Validate internal converter properties can still be set
+        assertEquals("bar1", internalConverter.configs.get("extra.config"));
+
+        
instantiateAndConfigureInternalConverter(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG,
 ClassLoaderUsage.PLUGINS);
+        // Validate schemas.enable is defaulted to false for internal converter
+        assertEquals(false, 
internalConverter.configs.get(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG));
+        // Validate internal converter properties can still be set
+        assertEquals("bar2", internalConverter.configs.get("extra.config"));
     }
 
     @Test
@@ -163,6 +165,16 @@ protected void instantiateAndConfigureConverter(String 
configPropName, ClassLoad
         assertNotNull(converter);
     }
 
+    protected void instantiateAndConfigureHeaderConverter(String 
configPropName) {
+        headerConverter = (TestHeaderConverter) 
plugins.newHeaderConverter(config, configPropName, 
ClassLoaderUsage.CURRENT_CLASSLOADER);
+        assertNotNull(headerConverter);
+    }
+
+    protected void instantiateAndConfigureInternalConverter(String 
configPropName, ClassLoaderUsage classLoaderUsage) {
+        internalConverter = (TestInternalConverter) 
plugins.newConverter(config, configPropName, classLoaderUsage);
+        assertNotNull(internalConverter);
+    }
+
     protected void assertConverterType(ConverterType type, Map<String, ?> 
props) {
         assertEquals(type.getName(), props.get(ConverterConfig.TYPE_CONFIG));
     }
@@ -230,4 +242,13 @@ public SchemaAndValue toConnectHeader(String topic, String 
headerKey, byte[] val
         public void close() throws IOException {
         }
     }
-}
\ No newline at end of file
+
+    public static class TestInternalConverter extends JsonConverter {
+        public Map<String, ?> configs;
+
+        public void configure(Map<String, ?> configs) {
+            this.configs = configs;
+            super.configure(configs);
+        }
+    }
+}
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 451f1038645..601ec6951b1 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -77,6 +77,15 @@ <h5><a id="upgrade_200_notable" 
href="#upgrade_200_notable">Notable changes in 2
     <li>New Kafka Streams configuration parameter <code>upgrade.from</code> 
added that allows rolling bounce upgrade from older version. </li>
     <li><a href="https://cwiki.apache.org/confluence/x/DVyHB";>KIP-284</a> 
changed the retention time for Kafka Streams repartition topics by setting its 
default value to <code>Long.MAX_VALUE</code>.</li>
     <li>Updated <code>ProcessorStateManager</code> APIs in Kafka Streams for 
registering state stores to the processor topology. For more details please 
read the Streams <a 
href="/{{version}}/documentation/streams/upgrade-guide#streams_api_changes_200">Upgrade
 Guide</a>.</li>
+    <li>
+        In earlier releases, Connect's worker configuration required the 
<code>internal.key.converter</code> and <code>internal.value.converter</code> 
properties.
+        In 2.0, these are <a 
href="https://cwiki.apache.org/confluence/x/AZQ7B";>no longer required</a> and 
default to the JSON converter.
+        You may safely remove these properties from your Connect standalone 
and distributed worker configurations:<br />
+        
<code>internal.key.converter=org.apache.kafka.connect.json.JsonConverter</code>
+        <code>internal.key.converter.schemas.enable=false</code>
+        
<code>internal.value.converter=org.apache.kafka.connect.json.JsonConverter</code>
+        <code>internal.value.converter.schemas.enable=false</code>
+    </li>
 </ul>
 
 <h5><a id="upgrade_200_new_protocols" href="#upgrade_200_new_protocols">New 
Protocol Versions</a></h5>
diff --git 
a/tests/kafkatest/tests/connect/templates/connect-distributed.properties 
b/tests/kafkatest/tests/connect/templates/connect-distributed.properties
index a1d3de29d51..186773e7d1a 100644
--- a/tests/kafkatest/tests/connect/templates/connect-distributed.properties
+++ b/tests/kafkatest/tests/connect/templates/connect-distributed.properties
@@ -29,11 +29,6 @@ key.converter.schemas.enable={{ 
schemas|default(True)|string|lower }}
 value.converter.schemas.enable={{ schemas|default(True)|string|lower }}
 {% endif %}
 
-internal.key.converter=org.apache.kafka.connect.json.JsonConverter
-internal.value.converter=org.apache.kafka.connect.json.JsonConverter
-internal.key.converter.schemas.enable=false
-internal.value.converter.schemas.enable=false
-
 offset.storage.topic={{ OFFSETS_TOPIC }}
 offset.storage.replication.factor={{ OFFSETS_REPLICATION_FACTOR }}
 offset.storage.partitions={{ OFFSETS_PARTITIONS }}
diff --git 
a/tests/kafkatest/tests/connect/templates/connect-standalone.properties 
b/tests/kafkatest/tests/connect/templates/connect-standalone.properties
index 5f079f7a396..a8eaa44832e 100644
--- a/tests/kafkatest/tests/connect/templates/connect-standalone.properties
+++ b/tests/kafkatest/tests/connect/templates/connect-standalone.properties
@@ -27,11 +27,6 @@ key.converter.schemas.enable={{ 
schemas|default(True)|string|lower }}
 value.converter.schemas.enable={{ schemas|default(True)|string|lower }}
 {% endif %}
 
-internal.key.converter=org.apache.kafka.connect.json.JsonConverter
-internal.value.converter=org.apache.kafka.connect.json.JsonConverter
-internal.key.converter.schemas.enable=false
-internal.value.converter.schemas.enable=false
-
 offset.storage.file.filename={{ OFFSETS_FILE }}
 
 # Reduce the admin client request timeouts so that we don't wait the default 
120 sec before failing to connect the admin client


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Deprecate and remove internal converter configs
> -----------------------------------------------
>
>                 Key: KAFKA-5540
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5540
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 0.11.0.0
>            Reporter: Ewen Cheslack-Postava
>            Assignee: Chris Egerton
>            Priority: Major
>              Labels: needs-kip
>             Fix For: 2.0.0
>
>
> The internal.key.converter and internal.value.converter were original exposed 
> as configs because a) they are actually pluggable and b) providing a default 
> would require relying on the JsonConverter always being available, which 
> until we had classloader isolation it was possible might be removed for 
> compatibility reasons.
> However, this has ultimately just caused a lot more trouble and confusion 
> than it is worth. We should deprecate the configs, give them a default of 
> JsonConverter (which is also kind of nice since it results in human-readable 
> data in the internal topics), and then ultimately remove them in the next 
> major version.
> These are all public APIs so this will need a small KIP before we can make 
> the change.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to