This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8725e36 MINOR: Remove deprecated streams config (#4906)
8725e36 is described below
commit 8725e3604b31d0bf822959fc6d870512411c7a05
Author: Guozhang Wang <[email protected]>
AuthorDate: Thu Apr 26 13:16:51 2018 -0700
MINOR: Remove deprecated streams config (#4906)
Removed the following: "zookeeper.connect", "key.serde", "value.serde",
"timestamp.extractor"
Reviewers: Bill Bejeck <[email protected]>, John Roesler
<[email protected]>, Jason Gustafson <[email protected]>
---
.../org/apache/kafka/streams/StreamsConfig.java | 105 ++-------------------
.../apache/kafka/streams/StreamsConfigTest.java | 47 +--------
2 files changed, 8 insertions(+), 144 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index e46d6d0..23e69c5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -279,14 +279,6 @@ public class StreamsConfig extends AbstractConfig {
public static final String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG =
"default.timestamp.extractor";
private static final String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC =
"Default timestamp extractor class that implements the
<code>org.apache.kafka.streams.processor.TimestampExtractor</code> interface.";
- /**
- * {@code key.serde}
- * @deprecated Use {@link #DEFAULT_KEY_SERDE_CLASS_CONFIG} instead.
- */
- @Deprecated
- public static final String KEY_SERDE_CLASS_CONFIG = "key.serde";
- private static final String KEY_SERDE_CLASS_DOC = "Serializer /
deserializer class for key that implements the
<code>org.apache.kafka.common.serialization.Serde</code> interface. This config
is deprecated, use <code>" + DEFAULT_KEY_SERDE_CLASS_CONFIG + "</code> instead";
-
/** {@code metadata.max.age.ms} */
public static final String METADATA_MAX_AGE_CONFIG =
CommonClientConfigs.METADATA_MAX_AGE_CONFIG;
@@ -363,40 +355,16 @@ public class StreamsConfig extends AbstractConfig {
public static final String STATE_DIR_CONFIG = "state.dir";
private static final String STATE_DIR_DOC = "Directory location for state
store.";
- /**
- * {@code timestamp.extractor}
- * @deprecated Use {@link #DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG}
instead.
- */
- @Deprecated
- public static final String TIMESTAMP_EXTRACTOR_CLASS_CONFIG =
"timestamp.extractor";
- private static final String TIMESTAMP_EXTRACTOR_CLASS_DOC = "Timestamp
extractor class that implements the
<code>org.apache.kafka.streams.processor.TimestampExtractor</code> interface.
This config is deprecated, use <code>" +
DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG + "</code> instead";
-
/** {@code upgrade.from} */
public static final String UPGRADE_FROM_CONFIG = "upgrade.from";
public static final String UPGRADE_FROM_DOC = "Allows upgrading from
versions 0.10.0/0.10.1/0.10.2/0.11.0/1.0/1.1 to version 1.2 (or newer) in a
backward compatible way. " +
"When upgrading from 1.2 to a newer version it is not required to
specify this config." +
"Default is null. Accepted values are \"" + UPGRADE_FROM_0100 + "\",
\"" + UPGRADE_FROM_0101 + "\", \"" + UPGRADE_FROM_0102 + "\", \"" +
UPGRADE_FROM_0110 + "\", \"" + UPGRADE_FROM_10 + "\", \"" + UPGRADE_FROM_11 +
"\" (for upgrading from the corresponding old version).";
- /**
- * {@code value.serde}
- * @deprecated Use {@link #DEFAULT_VALUE_SERDE_CLASS_CONFIG} instead.
- */
- @Deprecated
- public static final String VALUE_SERDE_CLASS_CONFIG = "value.serde";
- private static final String VALUE_SERDE_CLASS_DOC = "Serializer /
deserializer class for value that implements the
<code>org.apache.kafka.common.serialization.Serde</code> interface. This config
is deprecated, use <code>" + DEFAULT_VALUE_SERDE_CLASS_CONFIG + "</code>
instead";
-
/** {@code windowstore.changelog.additional.retention.ms} */
public static final String
WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG =
"windowstore.changelog.additional.retention.ms";
private static final String
WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_DOC = "Added to a windows
maintainMs to ensure data is not deleted from the log prematurely. Allows for
clock drift. Default is 1 day";
- /**
- * {@code zookeeper.connect}
- * @deprecated Kafka Streams does not use Zookeeper anymore and this
parameter will be ignored.
- */
- @Deprecated
- public static final String ZOOKEEPER_CONNECT_CONFIG = "zookeeper.connect";
- private static final String ZOOKEEPER_CONNECT_DOC = "Zookeeper connect
string for Kafka topics management. This config is deprecated and will be
ignored as Streams API does not use Zookeeper anymore.";
-
private static final String[] NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS =
new String[] {ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG};
private static final String[] NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS = new
String[] {ConsumerConfig.ISOLATION_LEVEL_CONFIG};
private static final String[] NON_CONFIGURABLE_PRODUCER_EOS_CONFIGS = new
String[] {ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,
@@ -609,30 +577,7 @@ public class StreamsConfig extends AbstractConfig {
Type.LONG,
24 * 60 * 60 * 1000L,
Importance.LOW,
- WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_DOC)
-
- // @deprecated
-
- .define(KEY_SERDE_CLASS_CONFIG,
- Type.CLASS,
- null,
- Importance.LOW,
- KEY_SERDE_CLASS_DOC)
- .define(TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
- Type.CLASS,
- null,
- Importance.LOW,
- TIMESTAMP_EXTRACTOR_CLASS_DOC)
- .define(VALUE_SERDE_CLASS_CONFIG,
- Type.CLASS,
- null,
- Importance.LOW,
- VALUE_SERDE_CLASS_DOC)
- .define(ZOOKEEPER_CONNECT_CONFIG,
- Type.STRING,
- "",
- Importance.LOW,
- ZOOKEEPER_CONNECT_DOC);
+ WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_DOC);
}
// this is the list of configs for underlying clients
@@ -773,8 +718,6 @@ public class StreamsConfig extends AbstractConfig {
// bootstrap.servers should be from StreamsConfig
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
originals().get(BOOTSTRAP_SERVERS_CONFIG));
- // remove deprecate ZK config
- consumerProps.remove(ZOOKEEPER_CONNECT_CONFIG);
return consumerProps;
}
@@ -969,30 +912,15 @@ public class StreamsConfig extends AbstractConfig {
}
/**
- * Return an {@link Serde#configure(Map, boolean) configured} instance of
{@link #KEY_SERDE_CLASS_CONFIG key Serde
- * class}. This method is deprecated. Use {@link #defaultKeySerde()}
method instead.
- *
- * @return an configured instance of key Serde class
- */
- @Deprecated
- public Serde keySerde() {
- return defaultKeySerde();
- }
-
- /**
* Return an {@link Serde#configure(Map, boolean) configured} instance of
{@link #DEFAULT_KEY_SERDE_CLASS_CONFIG key Serde
* class}.
*
* @return an configured instance of key Serde class
*/
public Serde defaultKeySerde() {
- Object keySerdeConfigSetting = get(KEY_SERDE_CLASS_CONFIG);
+ Object keySerdeConfigSetting = get(DEFAULT_KEY_SERDE_CLASS_CONFIG);
try {
- Serde<?> serde = getConfiguredInstance(KEY_SERDE_CLASS_CONFIG,
Serde.class);
- if (serde == null) {
- keySerdeConfigSetting = get(DEFAULT_KEY_SERDE_CLASS_CONFIG);
- serde = getConfiguredInstance(DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serde.class);
- }
+ Serde<?> serde =
getConfiguredInstance(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serde.class);
serde.configure(originals(), true);
return serde;
} catch (final Exception e) {
@@ -1002,32 +930,16 @@ public class StreamsConfig extends AbstractConfig {
}
/**
- * Return an {@link Serde#configure(Map, boolean) configured} instance of
{@link #VALUE_SERDE_CLASS_CONFIG value
- * Serde class}. This method is deprecated. Use {@link
#defaultValueSerde()} instead.
- *
- * @return an configured instance of value Serde class
- */
- @Deprecated
- public Serde valueSerde() {
- return defaultValueSerde();
- }
-
- /**
* Return an {@link Serde#configure(Map, boolean) configured} instance of
{@link #DEFAULT_VALUE_SERDE_CLASS_CONFIG value
* Serde class}.
*
* @return an configured instance of value Serde class
*/
public Serde defaultValueSerde() {
- Object valueSerdeConfigSetting = get(VALUE_SERDE_CLASS_CONFIG);
+ Object valueSerdeConfigSetting = get(DEFAULT_VALUE_SERDE_CLASS_CONFIG);
try {
- Serde<?> serde = getConfiguredInstance(VALUE_SERDE_CLASS_CONFIG,
Serde.class);
- if (serde == null) {
- valueSerdeConfigSetting =
get(DEFAULT_VALUE_SERDE_CLASS_CONFIG);
- serde =
getConfiguredInstance(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serde.class);
- }
+ Serde<?> serde =
getConfiguredInstance(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serde.class);
serde.configure(originals(), false);
-
return serde;
} catch (final Exception e) {
throw new StreamsException(
@@ -1035,13 +947,8 @@ public class StreamsConfig extends AbstractConfig {
}
}
-
public TimestampExtractor defaultTimestampExtractor() {
- TimestampExtractor timestampExtractor =
getConfiguredInstance(TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
TimestampExtractor.class);
- if (timestampExtractor == null) {
- return
getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
TimestampExtractor.class);
- }
- return timestampExtractor;
+ return getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
TimestampExtractor.class);
}
public DeserializationExceptionHandler
defaultDeserializationExceptionHandler() {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index ef5e5a8..ac82c04 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -454,20 +454,6 @@ public class StreamsConfigTest {
assertThat(streamsConfig.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG),
equalTo(commitIntervalMs));
}
- @SuppressWarnings("deprecation")
- @Test
- public void shouldBeBackwardsCompatibleWithDeprecatedConfigs() {
- final Properties props = minimalStreamsConfig();
- props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
Serdes.Double().getClass());
- props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
Serdes.Double().getClass());
- props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
MockTimestampExtractor.class);
-
- final StreamsConfig config = new StreamsConfig(props);
- assertTrue(config.defaultKeySerde() instanceof Serdes.DoubleSerde);
- assertTrue(config.defaultValueSerde() instanceof Serdes.DoubleSerde);
- assertTrue(config.defaultTimestampExtractor() instanceof
MockTimestampExtractor);
- }
-
@Test
public void shouldUseNewConfigsWhenPresent() {
final Properties props = minimalStreamsConfig();
@@ -489,28 +475,13 @@ public class StreamsConfigTest {
assertTrue(config.defaultTimestampExtractor() instanceof
FailOnInvalidTimestamp);
}
- @SuppressWarnings("deprecation")
- @Test
- public void
shouldSpecifyCorrectKeySerdeClassOnErrorUsingDeprecatedConfigs() {
- final Properties props = minimalStreamsConfig();
- props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
MisconfiguredSerde.class);
- final StreamsConfig config = new StreamsConfig(props);
- try {
- config.keySerde();
- fail("Test should throw a StreamsException");
- } catch (StreamsException e) {
- assertEquals("Failed to configure key serde class
org.apache.kafka.streams.StreamsConfigTest$MisconfiguredSerde", e.getMessage());
- }
- }
-
- @SuppressWarnings("deprecation")
@Test
public void shouldSpecifyCorrectKeySerdeClassOnError() {
final Properties props = minimalStreamsConfig();
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
MisconfiguredSerde.class);
final StreamsConfig config = new StreamsConfig(props);
try {
- config.keySerde();
+ config.defaultKeySerde();
fail("Test should throw a StreamsException");
} catch (StreamsException e) {
assertEquals("Failed to configure key serde class
org.apache.kafka.streams.StreamsConfigTest$MisconfiguredSerde", e.getMessage());
@@ -519,26 +490,12 @@ public class StreamsConfigTest {
@SuppressWarnings("deprecation")
@Test
- public void
shouldSpecifyCorrectValueSerdeClassOnErrorUsingDeprecatedConfigs() {
- final Properties props = minimalStreamsConfig();
- props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
MisconfiguredSerde.class);
- final StreamsConfig config = new StreamsConfig(props);
- try {
- config.valueSerde();
- fail("Test should throw a StreamsException");
- } catch (StreamsException e) {
- assertEquals("Failed to configure value serde class
org.apache.kafka.streams.StreamsConfigTest$MisconfiguredSerde", e.getMessage());
- }
- }
-
- @SuppressWarnings("deprecation")
- @Test
public void shouldSpecifyCorrectValueSerdeClassOnError() {
final Properties props = minimalStreamsConfig();
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
MisconfiguredSerde.class);
final StreamsConfig config = new StreamsConfig(props);
try {
- config.valueSerde();
+ config.defaultValueSerde();
fail("Test should throw a StreamsException");
} catch (StreamsException e) {
assertEquals("Failed to configure value serde class
org.apache.kafka.streams.StreamsConfigTest$MisconfiguredSerde", e.getMessage());
--
To stop receiving notification emails like this one, please contact
[email protected].