This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 700bdd5feeb KAFKA-17997 Remove deprecated config
log.message.timestamp.difference.max.ms (#17928)
700bdd5feeb is described below
commit 700bdd5feeb5902c703f7d68e0aa45394a090120
Author: HYUNSANG HAN (한현상, Travis) <[email protected]>
AuthorDate: Fri Nov 29 04:15:46 2024 +0900
KAFKA-17997 Remove deprecated config
log.message.timestamp.difference.max.ms (#17928)
Reviewers: Divij Vaidya <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../apache/kafka/common/config/TopicConfig.java | 17 ---------
core/src/main/scala/kafka/server/KafkaConfig.scala | 31 ++--------------
.../kafka/api/PlaintextProducerSendTest.scala | 4 ---
.../server/DynamicBrokerReconfigurationTest.scala | 4 ---
.../test/scala/unit/kafka/log/LogConfigTest.scala | 18 ----------
.../scala/unit/kafka/server/KafkaConfigTest.scala | 4 ---
.../unit/kafka/server/ProduceRequestTest.scala | 3 --
docs/configuration.html | 1 -
docs/upgrade.html | 4 +++
.../kafka/server/config/ServerLogConfigs.java | 13 -------
.../server/config/ServerTopicConfigSynonyms.java | 1 -
.../kafka/storage/internals/log/LogConfig.java | 42 ++--------------------
12 files changed, 8 insertions(+), 134 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
index fa9388f0ab6..17940055551 100755
--- a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
@@ -222,23 +222,6 @@ public class TopicConfig {
public static final String MESSAGE_TIMESTAMP_TYPE_DOC = "Define whether
the timestamp in the message is " +
"message create time or log append time.";
- /**
- * @deprecated since 3.6, removal planned in 4.0.
- * Use message.timestamp.before.max.ms and message.timestamp.after.max.ms
instead
- */
- @Deprecated
- public static final String MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG =
"message.timestamp.difference.max.ms";
-
- /**
- * @deprecated since 3.6, removal planned in 4.0.
- * Use message.timestamp.before.max.ms and message.timestamp.after.max.ms
instead
- */
- @Deprecated
- public static final String MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_DOC =
"[DEPRECATED] The maximum difference allowed between " +
- "the timestamp when a broker receives a message and the timestamp
specified in the message. If " +
- "message.timestamp.type=CreateTime, a message will be rejected if the
difference in timestamp " +
- "exceeds this threshold. This configuration is ignored if
message.timestamp.type=LogAppendTime.";
-
public static final String MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG =
"message.timestamp.before.max.ms";
public static final String MESSAGE_TIMESTAMP_BEFORE_MAX_MS_DOC = "This
configuration sets the allowable timestamp " +
"difference between the broker's timestamp and the message timestamp.
The message timestamp can be earlier than " +
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 18327a59b55..9502e81f3e4 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -492,35 +492,9 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
def logMessageTimestampType =
TimestampType.forName(getString(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_TYPE_CONFIG))
- /* See `TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG` for details
*/
- @deprecated("3.6")
- def logMessageTimestampDifferenceMaxMs: Long =
getLong(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG)
+ def logMessageTimestampBeforeMaxMs: Long =
getLong(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG)
- // In the transition period before logMessageTimestampDifferenceMaxMs is
removed, to maintain backward compatibility,
- // we are using its value if logMessageTimestampBeforeMaxMs default value
hasn't changed.
- // See `TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG` for deprecation details
- @nowarn("cat=deprecation")
- def logMessageTimestampBeforeMaxMs: Long = {
- val messageTimestampBeforeMaxMs: Long =
getLong(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG)
- if (messageTimestampBeforeMaxMs !=
ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_DEFAULT) {
- messageTimestampBeforeMaxMs
- } else {
- logMessageTimestampDifferenceMaxMs
- }
- }
-
- // In the transition period before logMessageTimestampDifferenceMaxMs is
removed, to maintain backward compatibility,
- // we are using its value if logMessageTimestampAfterMaxMs default value
hasn't changed.
- // See `TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG` for deprecation details
- @nowarn("cat=deprecation")
- def logMessageTimestampAfterMaxMs: Long = {
- val messageTimestampAfterMaxMs: Long =
getLong(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG)
- if (messageTimestampAfterMaxMs != Long.MaxValue) {
- messageTimestampAfterMaxMs
- } else {
- logMessageTimestampDifferenceMaxMs
- }
- }
+ def logMessageTimestampAfterMaxMs: Long =
getLong(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG)
def logMessageDownConversionEnable: Boolean =
getBoolean(ServerLogConfigs.LOG_MESSAGE_DOWNCONVERSION_ENABLE_CONFIG)
@@ -1088,7 +1062,6 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
logProps.put(TopicConfig.PREALLOCATE_CONFIG, logPreAllocateEnable)
logProps.put(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG,
logMessageFormatVersion.version)
logProps.put(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG,
logMessageTimestampType.name)
- logProps.put(TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG,
logMessageTimestampDifferenceMaxMs: java.lang.Long)
logProps.put(TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG,
logMessageTimestampBeforeMaxMs: java.lang.Long)
logProps.put(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG,
logMessageTimestampAfterMaxMs: java.lang.Long)
logProps.put(TopicConfig.MESSAGE_DOWNCONVERSION_ENABLE_CONFIG,
logMessageDownConversionEnable: java.lang.Boolean)
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
index c1ba3a1b83c..65eedf96e3a 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
@@ -33,7 +33,6 @@ import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{Arguments, MethodSource}
import java.nio.charset.StandardCharsets
-import scala.annotation.nowarn
class PlaintextProducerSendTest extends BaseProducerSendTest {
@@ -276,14 +275,11 @@ class PlaintextProducerSendTest extends
BaseProducerSendTest {
object PlaintextProducerSendTest {
- // See `TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG` for deprecation details
- @nowarn("cat=deprecation")
def quorumAndTimestampConfigProvider: java.util.stream.Stream[Arguments] = {
val now: Long = System.currentTimeMillis()
val fiveMinutesInMs: Long = 5 * 60 * 60 * 1000L
val data = new java.util.ArrayList[Arguments]()
for (groupProtocol <- GroupProtocol.values().map(gp =>
gp.name.toLowerCase(Locale.ROOT))) {
- data.add(Arguments.of("kraft", groupProtocol,
TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG, Long.box(now -
fiveMinutesInMs)))
data.add(Arguments.of("kraft", groupProtocol,
TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG, Long.box(now -
fiveMinutesInMs)))
data.add(Arguments.of("kraft", groupProtocol,
TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, Long.box(now +
fiveMinutesInMs)))
}
diff --git
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index b0c86ee4c67..a62fb75cad3 100644
---
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -590,7 +590,6 @@ class DynamicBrokerReconfigurationTest extends
QuorumTestHarness with SaslSetup
@ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
- @nowarn("cat=deprecation") // See
`TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG` for deprecation details
def testDefaultTopicConfig(quorum: String, groupProtocol: String): Unit = {
val (producerThread, consumerThread) = startProduceConsume(retries = 0,
groupProtocol)
@@ -615,7 +614,6 @@ class DynamicBrokerReconfigurationTest extends
QuorumTestHarness with SaslSetup
props.put(ServerConfigs.COMPRESSION_TYPE_CONFIG, "gzip")
props.put(ServerLogConfigs.LOG_PRE_ALLOCATE_CONFIG, true.toString)
props.put(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_TYPE_CONFIG,
TimestampType.LOG_APPEND_TIME.toString)
- props.put(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG,
"1000")
props.put(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG,
"1000")
props.put(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG,
"1000")
props.put(ServerLogConfigs.LOG_MESSAGE_DOWNCONVERSION_ENABLE_CONFIG,
"false")
@@ -657,14 +655,12 @@ class DynamicBrokerReconfigurationTest extends
QuorumTestHarness with SaslSetup
// Verify that we can alter subset of log configs
props.clear()
props.put(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_TYPE_CONFIG,
TimestampType.CREATE_TIME.toString)
- props.put(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG,
"1000")
props.put(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG,
"1000")
props.put(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG,
"1000")
reconfigureServers(props, perBrokerConfig = false,
(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_TYPE_CONFIG,
TimestampType.CREATE_TIME.toString))
consumerThread.waitForMatchingRecords(record => record.timestampType ==
TimestampType.CREATE_TIME)
// Verify that invalid configs are not applied
val invalidProps = Map(
- ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG ->
"abc", // Invalid type
ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG -> "abc", //
Invalid type
ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG -> "abc", //
Invalid type
ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_TYPE_CONFIG -> "invalid", //
Invalid value
diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
index 8126bb08b07..d9fa241075d 100644
--- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
@@ -470,24 +470,6 @@ class LogConfigTest {
"`remote.log.copy.disable` under Zookeeper's mode."))
}
- /* Verify that when the deprecated config
LOG_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG has non default value the new
configs
- * LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG and
LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG are not changed from the default we
are using
- * the deprecated config for backward compatibility.
- * See `TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG` for deprecation details */
- @nowarn("cat=deprecation")
- @Test
- def testTimestampBeforeMaxMsUsesDeprecatedConfig(): Unit = {
- val oneDayInMillis = 24 * 60 * 60 * 1000L
- val kafkaProps = TestUtils.createBrokerConfig(nodeId = 0, zkConnect = "")
-
kafkaProps.put(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG,
Long.MaxValue.toString)
- kafkaProps.put(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG,
Long.MaxValue.toString)
-
kafkaProps.put(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG,
oneDayInMillis.toString)
-
- val logProps = KafkaConfig.fromProps(kafkaProps).extractLogConfigMap
- assertEquals(oneDayInMillis,
logProps.get(TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG))
- assertEquals(oneDayInMillis,
logProps.get(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG))
- }
-
@Test
def testValidateWithMetadataVersionJbodSupport(): Unit = {
def validate(metadataVersion: MetadataVersion, jbodConfig: Boolean): Unit =
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 1c5c2d3b0cd..179a5db7ac7 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -845,7 +845,6 @@ class KafkaConfigTest {
}
@Test
- @nowarn("cat=deprecation") // See
`TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG` for deprecation details
def testFromPropsInvalid(): Unit = {
def baseProperties: Properties = {
val validRequiredProperties = new Properties()
@@ -940,7 +939,6 @@ class KafkaConfigTest {
case ServerLogConfigs.LOG_FLUSH_INTERVAL_MESSAGES_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
case ServerLogConfigs.LOG_FLUSH_SCHEDULER_INTERVAL_MS_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number")
case ServerLogConfigs.LOG_FLUSH_INTERVAL_MS_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number")
- case ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG
=> assertPropertyInvalid(baseProperties, name, "not_a_number")
case ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number")
case ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number")
case
ServerLogConfigs.LOG_FLUSH_START_OFFSET_CHECKPOINT_INTERVAL_MS_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number")
@@ -1209,8 +1207,6 @@ class KafkaConfigTest {
assertDynamic(kafkaConfigProp, 10008, () => config.messageMaxBytes)
case TopicConfig.MESSAGE_DOWNCONVERSION_ENABLE_CONFIG =>
assertDynamic(kafkaConfigProp, false, () =>
config.logMessageDownConversionEnable)
- case TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG =>
- assertDynamic(kafkaConfigProp, 10009, () =>
config.logMessageTimestampDifferenceMaxMs)
case TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG =>
assertDynamic(kafkaConfigProp, 10015L, () =>
config.logMessageTimestampBeforeMaxMs)
case TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG =>
diff --git a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
index 4b952674429..a270fe7ac4b 100644
--- a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
@@ -37,7 +37,6 @@ import org.junit.jupiter.params.provider.{Arguments,
MethodSource}
import org.junit.jupiter.params.provider.ValueSource
import java.util.concurrent.TimeUnit
-import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
/**
@@ -299,11 +298,9 @@ class ProduceRequestTest extends BaseRequestTest {
object ProduceRequestTest {
- @nowarn("cat=deprecation") // See
`TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG` for deprecation details
def timestampConfigProvider: java.util.stream.Stream[Arguments] = {
val fiveMinutesInMs: Long = 5 * 60 * 60 * 1000L
java.util.stream.Stream.of[Arguments](
- Arguments.of(TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG,
Long.box(System.currentTimeMillis() - fiveMinutesInMs)),
Arguments.of(TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG,
Long.box(System.currentTimeMillis() - fiveMinutesInMs)),
Arguments.of(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG,
Long.box(System.currentTimeMillis() + fiveMinutesInMs))
)
diff --git a/docs/configuration.html b/docs/configuration.html
index 1f43995bd11..20d2221f44d 100644
--- a/docs/configuration.html
+++ b/docs/configuration.html
@@ -128,7 +128,6 @@
<li><code>compression.type</code></li>
<li><code>log.preallocate</code></li>
<li><code>log.message.timestamp.type</code></li>
- <li><code>log.message.timestamp.difference.max.ms</code></li>
</ul>
<h5>Updating Log Cleaner Configs</h5>
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 4ed85d69c65..5a7a13f71fd 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -55,6 +55,10 @@
The <code>offsets.commit.required.acks</code>
configuration was removed.
See <a
href="https://cwiki.apache.org/confluence/x/9YobEg">KIP-1041</a> for details.
</li>
+ <li>The
<code>log.message.timestamp.difference.max.ms</code> configuration was removed.
+ Please use
<code>log.message.timestamp.before.max.ms</code> and
<code>log.message.timestamp.after.max.ms</code> instead.
+ See <a
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-937%3A+Improve+Message+Timestamp+Validation">KIP-937</a>
for details.
+ </li>
</ul>
</li>
<li><b>MirrorMaker</b>
diff --git
a/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java
b/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java
index 36409458a64..e8fb328732c 100644
---
a/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java
+++
b/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java
@@ -124,19 +124,6 @@ public class ServerLogConfigs {
public static final String LOG_MESSAGE_TIMESTAMP_TYPE_DOC = "Define
whether the timestamp in the message is message create time or log append time.
The value should be either " +
"<code>CreateTime</code> or <code>LogAppendTime</code>.";
- /* See `TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG` for
details */
- /**
- * @deprecated since "3.6"
- */
- @Deprecated
- public static final String LOG_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG
=
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG);
- @Deprecated
- public static final long LOG_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_DEFAULT =
Long.MAX_VALUE;
- public static final String LOG_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_DOC =
"[DEPRECATED] The maximum difference allowed between the timestamp when a
broker receives " +
- "a message and the timestamp specified in the message. If
log.message.timestamp.type=CreateTime, a message will be rejected " +
- "if the difference in timestamp exceeds this threshold. This
configuration is ignored if log.message.timestamp.type=LogAppendTime." +
- "The maximum timestamp difference allowed should be no greater
than log.retention.ms to avoid unnecessarily frequent log rolling.";
-
public static final String LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG =
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG);
public static final long LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_DEFAULT =
Long.MAX_VALUE;
public static final String LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_DOC = "This
configuration sets the allowable timestamp difference between the " +
diff --git
a/server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java
b/server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java
index 66747e74364..8057abe67cf 100644
---
a/server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java
+++
b/server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java
@@ -86,7 +86,6 @@ public final class ServerTopicConfigSynonyms {
sameNameWithLogPrefix(TopicConfig.PREALLOCATE_CONFIG),
sameNameWithLogPrefix(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG),
sameNameWithLogPrefix(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG),
-
sameNameWithLogPrefix(TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG),
sameNameWithLogPrefix(TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG),
sameNameWithLogPrefix(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG),
sameNameWithLogPrefix(TopicConfig.MESSAGE_DOWNCONVERSION_ENABLE_CONFIG),
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
index 52574021d0c..3ca78b38e06 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
@@ -180,8 +180,6 @@ public class LogConfig extends AbstractConfig {
public static final boolean DEFAULT_UNCLEAN_LEADER_ELECTION_ENABLE = false;
public static final String DEFAULT_COMPRESSION_TYPE =
BrokerCompressionType.PRODUCER.name;
public static final boolean DEFAULT_PREALLOCATE = false;
- @Deprecated
- public static final long DEFAULT_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS =
ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_DEFAULT;
public static final boolean DEFAULT_REMOTE_STORAGE_ENABLE = false;
public static final boolean DEFAULT_REMOTE_LOG_COPY_DISABLE_CONFIG = false;
@@ -198,12 +196,6 @@ public class LogConfig extends AbstractConfig {
@SuppressWarnings("deprecation")
private static final String MESSAGE_FORMAT_VERSION_CONFIG =
TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG;
- @SuppressWarnings("deprecation")
- private static final String MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG =
TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG;
-
- @SuppressWarnings("deprecation")
- private static final String MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_DOC =
TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_DOC;
-
// Visible for testing
public static final Set<String> CONFIGS_WITH_NO_SERVER_DEFAULTS = Set.of(
TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
@@ -250,7 +242,6 @@ public class LogConfig extends AbstractConfig {
.define(ServerLogConfigs.MIN_IN_SYNC_REPLICAS_CONFIG, INT,
ServerLogConfigs.MIN_IN_SYNC_REPLICAS_DEFAULT, atLeast(1), HIGH,
ServerLogConfigs.MIN_IN_SYNC_REPLICAS_DOC)
.define(ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_CONFIG,
STRING, ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_DEFAULT, new
MetadataVersionValidator(), MEDIUM,
ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_DOC)
.define(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_TYPE_CONFIG,
STRING, ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_TYPE_DEFAULT,
ConfigDef.ValidString.in("CreateTime", "LogAppendTime"), MEDIUM,
ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_TYPE_DOC)
-
.define(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG, LONG,
ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_DEFAULT, atLeast(0),
MEDIUM, ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_DOC)
.define(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG, LONG,
ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_DEFAULT, atLeast(0),
MEDIUM, ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_DOC)
.define(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, LONG,
ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_DEFAULT, atLeast(0),
MEDIUM, ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_DOC)
.define(ServerLogConfigs.CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG,
CLASS, null, LOW, ServerLogConfigs.CREATE_TOPIC_POLICY_CLASS_NAME_DOC)
@@ -311,8 +302,6 @@ public class LogConfig extends AbstractConfig {
MESSAGE_FORMAT_VERSION_DOC)
.define(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, STRING,
ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_TYPE_DEFAULT,
in("CreateTime", "LogAppendTime"), MEDIUM,
TopicConfig.MESSAGE_TIMESTAMP_TYPE_DOC)
- .define(MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG, LONG,
DEFAULT_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS,
- atLeast(0), MEDIUM,
MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_DOC)
.define(TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG,
LONG, ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_DEFAULT,
atLeast(0), MEDIUM,
TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_DOC)
.define(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG,
LONG, ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_DEFAULT,
@@ -367,9 +356,6 @@ public class LogConfig extends AbstractConfig {
public final TimestampType messageTimestampType;
- /* See `TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG` for
details regarding the deprecation */
- @Deprecated
- public final long messageTimestampDifferenceMaxMs;
public final long messageTimestampBeforeMaxMs;
public final long messageTimestampAfterMaxMs;
public final List<String> leaderReplicationThrottledReplicas;
@@ -420,9 +406,8 @@ public class LogConfig extends AbstractConfig {
this.preallocate = getBoolean(TopicConfig.PREALLOCATE_CONFIG);
this.messageFormatVersion =
MetadataVersion.fromVersionString(getString(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG));
this.messageTimestampType =
TimestampType.forName(getString(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG));
- this.messageTimestampDifferenceMaxMs =
getLong(TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG);
- this.messageTimestampBeforeMaxMs = getMessageTimestampBeforeMaxMs();
- this.messageTimestampAfterMaxMs = getMessageTimestampAfterMaxMs();
+ this.messageTimestampBeforeMaxMs =
getLong(TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG);
+ this.messageTimestampAfterMaxMs =
getLong(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG);
this.leaderReplicationThrottledReplicas =
Collections.unmodifiableList(getList(QuotaConfig.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG));
this.followerReplicationThrottledReplicas =
Collections.unmodifiableList(getList(QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG));
this.messageDownConversionEnable =
getBoolean(TopicConfig.MESSAGE_DOWNCONVERSION_ENABLE_CONFIG);
@@ -455,28 +440,6 @@ public class LogConfig extends AbstractConfig {
}
}
- //In the transition period before messageTimestampDifferenceMaxMs is
removed, to maintain backward compatibility,
- // we are using its value if messageTimestampBeforeMaxMs default value
hasn't changed.
- private long getMessageTimestampBeforeMaxMs() {
- final Long messageTimestampBeforeMaxMs =
getLong(TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG);
- if (!messageTimestampBeforeMaxMs.equals(Long.MAX_VALUE)) {
- return messageTimestampBeforeMaxMs;
- } else {
- return messageTimestampDifferenceMaxMs;
- }
- }
-
- //In the transition period before messageTimestampDifferenceMaxMs is
removed, to maintain backward compatibility,
- // we are using its value if messageTimestampAfterMaxMs default value
hasn't changed.
- private long getMessageTimestampAfterMaxMs() {
- final Long messageTimestampAfterMaxMs =
getLong(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG);
- if (!messageTimestampAfterMaxMs.equals(Long.MAX_VALUE)) {
- return messageTimestampAfterMaxMs;
- } else {
- return messageTimestampDifferenceMaxMs;
- }
- }
-
public RecordVersion recordVersion() {
return messageFormatVersion.highestSupportedRecordVersion();
}
@@ -783,7 +746,6 @@ public class LogConfig extends AbstractConfig {
", preallocate=" + preallocate +
", messageFormatVersion=" + messageFormatVersion +
", messageTimestampType=" + messageTimestampType +
- ", messageTimestampDifferenceMaxMs=" +
messageTimestampDifferenceMaxMs +
", leaderReplicationThrottledReplicas=" +
leaderReplicationThrottledReplicas +
", followerReplicationThrottledReplicas=" +
followerReplicationThrottledReplicas +
", messageDownConversionEnable=" + messageDownConversionEnable
+