This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 700bdd5feeb KAFKA-17997 Remove deprecated config 
log.message.timestamp.difference.max.ms (#17928)
700bdd5feeb is described below

commit 700bdd5feeb5902c703f7d68e0aa45394a090120
Author: HYUNSANG HAN (한현상, Travis) <[email protected]>
AuthorDate: Fri Nov 29 04:15:46 2024 +0900

    KAFKA-17997 Remove deprecated config 
log.message.timestamp.difference.max.ms (#17928)
    
    Reviewers: Divij Vaidya <[email protected]>, Chia-Ping Tsai 
<[email protected]>
---
 .../apache/kafka/common/config/TopicConfig.java    | 17 ---------
 core/src/main/scala/kafka/server/KafkaConfig.scala | 31 ++--------------
 .../kafka/api/PlaintextProducerSendTest.scala      |  4 ---
 .../server/DynamicBrokerReconfigurationTest.scala  |  4 ---
 .../test/scala/unit/kafka/log/LogConfigTest.scala  | 18 ----------
 .../scala/unit/kafka/server/KafkaConfigTest.scala  |  4 ---
 .../unit/kafka/server/ProduceRequestTest.scala     |  3 --
 docs/configuration.html                            |  1 -
 docs/upgrade.html                                  |  4 +++
 .../kafka/server/config/ServerLogConfigs.java      | 13 -------
 .../server/config/ServerTopicConfigSynonyms.java   |  1 -
 .../kafka/storage/internals/log/LogConfig.java     | 42 ++--------------------
 12 files changed, 8 insertions(+), 134 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java 
b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
index fa9388f0ab6..17940055551 100755
--- a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
@@ -222,23 +222,6 @@ public class TopicConfig {
     public static final String MESSAGE_TIMESTAMP_TYPE_DOC = "Define whether 
the timestamp in the message is " +
         "message create time or log append time.";
 
-    /**
-     * @deprecated since 3.6, removal planned in 4.0.
-     * Use message.timestamp.before.max.ms and message.timestamp.after.max.ms 
instead
-     */
-    @Deprecated
-    public static final String MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG = 
"message.timestamp.difference.max.ms";
-
-    /**
-     * @deprecated since 3.6, removal planned in 4.0.
-     * Use message.timestamp.before.max.ms and message.timestamp.after.max.ms 
instead
-     */
-    @Deprecated
-    public static final String MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_DOC = 
"[DEPRECATED] The maximum difference allowed between " +
-        "the timestamp when a broker receives a message and the timestamp 
specified in the message. If " +
-        "message.timestamp.type=CreateTime, a message will be rejected if the 
difference in timestamp " +
-        "exceeds this threshold. This configuration is ignored if 
message.timestamp.type=LogAppendTime.";
-
     public static final String MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG = 
"message.timestamp.before.max.ms";
     public static final String MESSAGE_TIMESTAMP_BEFORE_MAX_MS_DOC = "This 
configuration sets the allowable timestamp " +
         "difference between the broker's timestamp and the message timestamp. 
The message timestamp can be earlier than " +
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 18327a59b55..9502e81f3e4 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -492,35 +492,9 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
 
   def logMessageTimestampType = 
TimestampType.forName(getString(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_TYPE_CONFIG))
 
-  /* See `TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG` for details 
*/
-  @deprecated("3.6")
-  def logMessageTimestampDifferenceMaxMs: Long = 
getLong(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG)
+  def logMessageTimestampBeforeMaxMs: Long = 
getLong(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG)
 
-  // In the transition period before logMessageTimestampDifferenceMaxMs is 
removed, to maintain backward compatibility,
-  // we are using its value if logMessageTimestampBeforeMaxMs default value 
hasn't changed.
-  // See `TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG` for deprecation details
-  @nowarn("cat=deprecation")
-  def logMessageTimestampBeforeMaxMs: Long = {
-    val messageTimestampBeforeMaxMs: Long = 
getLong(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG)
-    if (messageTimestampBeforeMaxMs != 
ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_DEFAULT) {
-      messageTimestampBeforeMaxMs
-    } else {
-      logMessageTimestampDifferenceMaxMs
-    }
-  }
-
-  // In the transition period before logMessageTimestampDifferenceMaxMs is 
removed, to maintain backward compatibility,
-  // we are using its value if logMessageTimestampAfterMaxMs default value 
hasn't changed.
-  // See `TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG` for deprecation details
-  @nowarn("cat=deprecation")
-  def logMessageTimestampAfterMaxMs: Long = {
-    val messageTimestampAfterMaxMs: Long = 
getLong(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG)
-    if (messageTimestampAfterMaxMs != Long.MaxValue) {
-      messageTimestampAfterMaxMs
-    } else {
-      logMessageTimestampDifferenceMaxMs
-    }
-  }
+  def logMessageTimestampAfterMaxMs: Long = 
getLong(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG)
 
   def logMessageDownConversionEnable: Boolean = 
getBoolean(ServerLogConfigs.LOG_MESSAGE_DOWNCONVERSION_ENABLE_CONFIG)
 
@@ -1088,7 +1062,6 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
     logProps.put(TopicConfig.PREALLOCATE_CONFIG, logPreAllocateEnable)
     logProps.put(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG, 
logMessageFormatVersion.version)
     logProps.put(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, 
logMessageTimestampType.name)
-    logProps.put(TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG, 
logMessageTimestampDifferenceMaxMs: java.lang.Long)
     logProps.put(TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG, 
logMessageTimestampBeforeMaxMs: java.lang.Long)
     logProps.put(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, 
logMessageTimestampAfterMaxMs: java.lang.Long)
     logProps.put(TopicConfig.MESSAGE_DOWNCONVERSION_ENABLE_CONFIG, 
logMessageDownConversionEnable: java.lang.Boolean)
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
index c1ba3a1b83c..65eedf96e3a 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
@@ -33,7 +33,6 @@ import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.{Arguments, MethodSource}
 
 import java.nio.charset.StandardCharsets
-import scala.annotation.nowarn
 
 
 class PlaintextProducerSendTest extends BaseProducerSendTest {
@@ -276,14 +275,11 @@ class PlaintextProducerSendTest extends 
BaseProducerSendTest {
 
 object PlaintextProducerSendTest {
 
-  // See `TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG` for deprecation details
-  @nowarn("cat=deprecation")
   def quorumAndTimestampConfigProvider: java.util.stream.Stream[Arguments] = {
     val now: Long = System.currentTimeMillis()
     val fiveMinutesInMs: Long = 5 * 60 * 60 * 1000L
     val data = new java.util.ArrayList[Arguments]()
     for (groupProtocol <- GroupProtocol.values().map(gp => 
gp.name.toLowerCase(Locale.ROOT))) {
-      data.add(Arguments.of("kraft", groupProtocol, 
TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG, Long.box(now - 
fiveMinutesInMs)))
       data.add(Arguments.of("kraft", groupProtocol, 
TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG, Long.box(now - 
fiveMinutesInMs)))
       data.add(Arguments.of("kraft", groupProtocol, 
TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, Long.box(now + 
fiveMinutesInMs)))
     }
diff --git 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index b0c86ee4c67..a62fb75cad3 100644
--- 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -590,7 +590,6 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
 
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
   @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
-  @nowarn("cat=deprecation") // See 
`TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG` for deprecation details
   def testDefaultTopicConfig(quorum: String, groupProtocol: String): Unit = {
     val (producerThread, consumerThread) = startProduceConsume(retries = 0, 
groupProtocol)
 
@@ -615,7 +614,6 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
     props.put(ServerConfigs.COMPRESSION_TYPE_CONFIG, "gzip")
     props.put(ServerLogConfigs.LOG_PRE_ALLOCATE_CONFIG, true.toString)
     props.put(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_TYPE_CONFIG, 
TimestampType.LOG_APPEND_TIME.toString)
-    props.put(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG, 
"1000")
     props.put(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG, 
"1000")
     props.put(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, 
"1000")
     props.put(ServerLogConfigs.LOG_MESSAGE_DOWNCONVERSION_ENABLE_CONFIG, 
"false")
@@ -657,14 +655,12 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
     // Verify that we can alter subset of log configs
     props.clear()
     props.put(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_TYPE_CONFIG, 
TimestampType.CREATE_TIME.toString)
-    props.put(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG, 
"1000")
     props.put(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG, 
"1000")
     props.put(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, 
"1000")
     reconfigureServers(props, perBrokerConfig = false, 
(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_TYPE_CONFIG, 
TimestampType.CREATE_TIME.toString))
     consumerThread.waitForMatchingRecords(record => record.timestampType == 
TimestampType.CREATE_TIME)
     // Verify that invalid configs are not applied
     val invalidProps = Map(
-      ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG -> 
"abc", // Invalid type
       ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG -> "abc", // 
Invalid type
       ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG -> "abc", // 
Invalid type
       ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_TYPE_CONFIG -> "invalid", // 
Invalid value
diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala 
b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
index 8126bb08b07..d9fa241075d 100644
--- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
@@ -470,24 +470,6 @@ class LogConfigTest {
       "`remote.log.copy.disable` under Zookeeper's mode."))
   }
 
-  /* Verify that when the deprecated config 
LOG_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG has non default value the new 
configs
-   * LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG and 
LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG are not changed from the default we 
are using
-   * the deprecated config for backward compatibility.
-   * See `TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG` for deprecation details */
-  @nowarn("cat=deprecation")
-  @Test
-  def testTimestampBeforeMaxMsUsesDeprecatedConfig(): Unit = {
-    val oneDayInMillis = 24 * 60 * 60 * 1000L
-    val kafkaProps = TestUtils.createBrokerConfig(nodeId = 0, zkConnect = "")
-    
kafkaProps.put(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG, 
Long.MaxValue.toString)
-    kafkaProps.put(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, 
Long.MaxValue.toString)
-    
kafkaProps.put(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG, 
oneDayInMillis.toString)
-
-    val logProps = KafkaConfig.fromProps(kafkaProps).extractLogConfigMap
-    assertEquals(oneDayInMillis, 
logProps.get(TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG))
-    assertEquals(oneDayInMillis, 
logProps.get(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG))
-  }
-
   @Test
   def testValidateWithMetadataVersionJbodSupport(): Unit = {
     def validate(metadataVersion: MetadataVersion, jbodConfig: Boolean): Unit =
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 1c5c2d3b0cd..179a5db7ac7 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -845,7 +845,6 @@ class KafkaConfigTest {
   }
 
   @Test
-  @nowarn("cat=deprecation") // See 
`TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG` for deprecation details
   def testFromPropsInvalid(): Unit = {
     def baseProperties: Properties = {
       val validRequiredProperties = new Properties()
@@ -940,7 +939,6 @@ class KafkaConfigTest {
         case ServerLogConfigs.LOG_FLUSH_INTERVAL_MESSAGES_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
         case ServerLogConfigs.LOG_FLUSH_SCHEDULER_INTERVAL_MS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number")
         case ServerLogConfigs.LOG_FLUSH_INTERVAL_MS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number")
-        case ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG 
=> assertPropertyInvalid(baseProperties, name, "not_a_number")
         case ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number")
         case ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number")
         case 
ServerLogConfigs.LOG_FLUSH_START_OFFSET_CHECKPOINT_INTERVAL_MS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number")
@@ -1209,8 +1207,6 @@ class KafkaConfigTest {
           assertDynamic(kafkaConfigProp, 10008, () => config.messageMaxBytes)
         case TopicConfig.MESSAGE_DOWNCONVERSION_ENABLE_CONFIG =>
           assertDynamic(kafkaConfigProp, false, () => 
config.logMessageDownConversionEnable)
-        case TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG =>
-          assertDynamic(kafkaConfigProp, 10009, () => 
config.logMessageTimestampDifferenceMaxMs)
         case TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG =>
           assertDynamic(kafkaConfigProp, 10015L, () => 
config.logMessageTimestampBeforeMaxMs)
         case TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG =>
diff --git a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
index 4b952674429..a270fe7ac4b 100644
--- a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
@@ -37,7 +37,6 @@ import org.junit.jupiter.params.provider.{Arguments, 
MethodSource}
 import org.junit.jupiter.params.provider.ValueSource
 
 import java.util.concurrent.TimeUnit
-import scala.annotation.nowarn
 import scala.jdk.CollectionConverters._
 
 /**
@@ -299,11 +298,9 @@ class ProduceRequestTest extends BaseRequestTest {
 
 object ProduceRequestTest {
 
-  @nowarn("cat=deprecation") // See 
`TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG` for deprecation details
   def timestampConfigProvider: java.util.stream.Stream[Arguments] = {
     val fiveMinutesInMs: Long = 5 * 60 * 60 * 1000L
     java.util.stream.Stream.of[Arguments](
-      Arguments.of(TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG, 
Long.box(System.currentTimeMillis() - fiveMinutesInMs)),
       Arguments.of(TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG, 
Long.box(System.currentTimeMillis() - fiveMinutesInMs)),
       Arguments.of(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, 
Long.box(System.currentTimeMillis() + fiveMinutesInMs))
     )
diff --git a/docs/configuration.html b/docs/configuration.html
index 1f43995bd11..20d2221f44d 100644
--- a/docs/configuration.html
+++ b/docs/configuration.html
@@ -128,7 +128,6 @@
     <li><code>compression.type</code></li>
     <li><code>log.preallocate</code></li>
     <li><code>log.message.timestamp.type</code></li>
-    <li><code>log.message.timestamp.difference.max.ms</code></li>
   </ul>
 
   <h5>Updating Log Cleaner Configs</h5>
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 4ed85d69c65..5a7a13f71fd 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -55,6 +55,10 @@
                             The <code>offsets.commit.required.acks</code> 
configuration was removed.
                             See <a 
href="https://cwiki.apache.org/confluence/x/9YobEg";>KIP-1041</a> for details.
                         </li>
+                        <li>The 
<code>log.message.timestamp.difference.max.ms</code> configuration was removed.
+                            Please use 
<code>log.message.timestamp.before.max.ms</code> and 
<code>log.message.timestamp.after.max.ms</code> instead.
+                            See <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-937%3A+Improve+Message+Timestamp+Validation";>KIP-937</a>
 for details.
+                        </li>
                     </ul>
                 </li>
                 <li><b>MirrorMaker</b>
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java
 
b/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java
index 36409458a64..e8fb328732c 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java
@@ -124,19 +124,6 @@ public class ServerLogConfigs {
     public static final String LOG_MESSAGE_TIMESTAMP_TYPE_DOC = "Define 
whether the timestamp in the message is message create time or log append time. 
The value should be either " +
             "<code>CreateTime</code> or <code>LogAppendTime</code>.";
 
-    /* See `TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG` for 
details */
-    /**
-     * @deprecated since "3.6"
-     */
-    @Deprecated
-    public static final String LOG_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG 
= 
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG);
-    @Deprecated
-    public static final long LOG_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_DEFAULT = 
Long.MAX_VALUE;
-    public static final String LOG_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_DOC = 
"[DEPRECATED] The maximum difference allowed between the timestamp when a 
broker receives " +
-            "a message and the timestamp specified in the message. If 
log.message.timestamp.type=CreateTime, a message will be rejected " +
-            "if the difference in timestamp exceeds this threshold. This 
configuration is ignored if log.message.timestamp.type=LogAppendTime." +
-            "The maximum timestamp difference allowed should be no greater 
than log.retention.ms to avoid unnecessarily frequent log rolling.";
-
     public static final String LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG = 
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG);
     public static final long LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_DEFAULT = 
Long.MAX_VALUE;
     public static final String LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_DOC = "This 
configuration sets the allowable timestamp difference between the " +
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java
 
b/server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java
index 66747e74364..8057abe67cf 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java
@@ -86,7 +86,6 @@ public final class ServerTopicConfigSynonyms {
         sameNameWithLogPrefix(TopicConfig.PREALLOCATE_CONFIG),
         sameNameWithLogPrefix(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG),
         sameNameWithLogPrefix(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG),
-        
sameNameWithLogPrefix(TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG),
         
sameNameWithLogPrefix(TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG),
         
sameNameWithLogPrefix(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG),
         
sameNameWithLogPrefix(TopicConfig.MESSAGE_DOWNCONVERSION_ENABLE_CONFIG),
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
index 52574021d0c..3ca78b38e06 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
@@ -180,8 +180,6 @@ public class LogConfig extends AbstractConfig {
     public static final boolean DEFAULT_UNCLEAN_LEADER_ELECTION_ENABLE = false;
     public static final String DEFAULT_COMPRESSION_TYPE = 
BrokerCompressionType.PRODUCER.name;
     public static final boolean DEFAULT_PREALLOCATE = false;
-    @Deprecated
-    public static final long DEFAULT_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS = 
ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_DEFAULT;
 
     public static final boolean DEFAULT_REMOTE_STORAGE_ENABLE = false;
     public static final boolean DEFAULT_REMOTE_LOG_COPY_DISABLE_CONFIG = false;
@@ -198,12 +196,6 @@ public class LogConfig extends AbstractConfig {
     @SuppressWarnings("deprecation")
     private static final String MESSAGE_FORMAT_VERSION_CONFIG = 
TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG;
 
-    @SuppressWarnings("deprecation")
-    private static final String MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG = 
TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG;
-
-    @SuppressWarnings("deprecation")
-    private static final String MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_DOC = 
TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_DOC;
-
     // Visible for testing
     public static final Set<String> CONFIGS_WITH_NO_SERVER_DEFAULTS = Set.of(
             TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
@@ -250,7 +242,6 @@ public class LogConfig extends AbstractConfig {
             .define(ServerLogConfigs.MIN_IN_SYNC_REPLICAS_CONFIG, INT, 
ServerLogConfigs.MIN_IN_SYNC_REPLICAS_DEFAULT, atLeast(1), HIGH, 
ServerLogConfigs.MIN_IN_SYNC_REPLICAS_DOC)
             .define(ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_CONFIG, 
STRING, ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_DEFAULT, new 
MetadataVersionValidator(), MEDIUM, 
ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_DOC)
             .define(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_TYPE_CONFIG, 
STRING, ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_TYPE_DEFAULT, 
ConfigDef.ValidString.in("CreateTime", "LogAppendTime"), MEDIUM, 
ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_TYPE_DOC)
-            
.define(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG, LONG, 
ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_DEFAULT, atLeast(0), 
MEDIUM, ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_DOC)
             
.define(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG, LONG, 
ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_DEFAULT, atLeast(0), 
MEDIUM, ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_DOC)
             
.define(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, LONG, 
ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_DEFAULT, atLeast(0), 
MEDIUM, ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_DOC)
             .define(ServerLogConfigs.CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG, 
CLASS, null, LOW, ServerLogConfigs.CREATE_TOPIC_POLICY_CLASS_NAME_DOC)
@@ -311,8 +302,6 @@ public class LogConfig extends AbstractConfig {
                         MESSAGE_FORMAT_VERSION_DOC)
                 .define(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, STRING, 
ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_TYPE_DEFAULT,
                         in("CreateTime", "LogAppendTime"), MEDIUM, 
TopicConfig.MESSAGE_TIMESTAMP_TYPE_DOC)
-                .define(MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG, LONG, 
DEFAULT_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS,
-                        atLeast(0), MEDIUM, 
MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_DOC)
                 .define(TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG, 
LONG, ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_DEFAULT,
                         atLeast(0), MEDIUM, 
TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_DOC)
                 .define(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, 
LONG, ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_DEFAULT,
@@ -367,9 +356,6 @@ public class LogConfig extends AbstractConfig {
 
     public final TimestampType messageTimestampType;
 
-    /* See `TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG` for 
details regarding the deprecation */
-    @Deprecated
-    public final long messageTimestampDifferenceMaxMs;
     public final long messageTimestampBeforeMaxMs;
     public final long messageTimestampAfterMaxMs;
     public final List<String> leaderReplicationThrottledReplicas;
@@ -420,9 +406,8 @@ public class LogConfig extends AbstractConfig {
         this.preallocate = getBoolean(TopicConfig.PREALLOCATE_CONFIG);
         this.messageFormatVersion = 
MetadataVersion.fromVersionString(getString(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG));
         this.messageTimestampType = 
TimestampType.forName(getString(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG));
-        this.messageTimestampDifferenceMaxMs = 
getLong(TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG);
-        this.messageTimestampBeforeMaxMs = getMessageTimestampBeforeMaxMs();
-        this.messageTimestampAfterMaxMs = getMessageTimestampAfterMaxMs();
+        this.messageTimestampBeforeMaxMs = 
getLong(TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG);
+        this.messageTimestampAfterMaxMs = 
getLong(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG);
         this.leaderReplicationThrottledReplicas = 
Collections.unmodifiableList(getList(QuotaConfig.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG));
         this.followerReplicationThrottledReplicas = 
Collections.unmodifiableList(getList(QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG));
         this.messageDownConversionEnable = 
getBoolean(TopicConfig.MESSAGE_DOWNCONVERSION_ENABLE_CONFIG);
@@ -455,28 +440,6 @@ public class LogConfig extends AbstractConfig {
         }
     }
 
-    //In the transition period before messageTimestampDifferenceMaxMs is 
removed, to maintain backward compatibility,
-    // we are using its value if messageTimestampBeforeMaxMs default value 
hasn't changed.
-    private long getMessageTimestampBeforeMaxMs() {
-        final Long messageTimestampBeforeMaxMs = 
getLong(TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG);
-        if (!messageTimestampBeforeMaxMs.equals(Long.MAX_VALUE)) {
-            return messageTimestampBeforeMaxMs;
-        } else {
-            return messageTimestampDifferenceMaxMs;
-        }
-    }
-
-    //In the transition period before messageTimestampDifferenceMaxMs is 
removed, to maintain backward compatibility,
-    // we are using its value if messageTimestampAfterMaxMs default value 
hasn't changed.
-    private long getMessageTimestampAfterMaxMs() {
-        final Long messageTimestampAfterMaxMs = 
getLong(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG);
-        if (!messageTimestampAfterMaxMs.equals(Long.MAX_VALUE)) {
-            return messageTimestampAfterMaxMs;
-        } else {
-            return messageTimestampDifferenceMaxMs;
-        }
-    }
-
     public RecordVersion recordVersion() {
         return messageFormatVersion.highestSupportedRecordVersion();
     }
@@ -783,7 +746,6 @@ public class LogConfig extends AbstractConfig {
                 ", preallocate=" + preallocate +
                 ", messageFormatVersion=" + messageFormatVersion +
                 ", messageTimestampType=" + messageTimestampType +
-                ", messageTimestampDifferenceMaxMs=" + 
messageTimestampDifferenceMaxMs +
                 ", leaderReplicationThrottledReplicas=" + 
leaderReplicationThrottledReplicas +
                 ", followerReplicationThrottledReplicas=" + 
followerReplicationThrottledReplicas +
                 ", messageDownConversionEnable=" + messageDownConversionEnable 
+

Reply via email to