This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7efb58f3211 KAFKA-16584 Make log processing summary configurable or 
debug (#16509)
7efb58f3211 is described below

commit 7efb58f3211cdc3bd0524a79f9b5ce4ef74a77b4
Author: dujian0068 <[email protected]>
AuthorDate: Wed Jul 24 04:09:25 2024 +0800

    KAFKA-16584 Make log processing summary configurable or debug (#16509)
    
    KAFKA-16584 Make log processing summary configurable or debug
    
    Reviewers: Matthias Sax <[email protected]>, Bill Bejeck <[email protected]>
---
 docs/streams/developer-guide/config-streams.html         | 16 ++++++++++++++++
 .../java/org/apache/kafka/streams/StreamsConfig.java     | 11 ++++++++++-
 .../kafka/streams/processor/internals/StreamThread.java  |  9 ++++-----
 3 files changed, 30 insertions(+), 6 deletions(-)

diff --git a/docs/streams/developer-guide/config-streams.html 
b/docs/streams/developer-guide/config-streams.html
index 92a44e0be8c..f50945778a1 100644
--- a/docs/streams/developer-guide/config-streams.html
+++ b/docs/streams/developer-guide/config-streams.html
@@ -93,6 +93,7 @@ settings.put(... , ...);</code></pre>
               <li><a class="reference internal" href="#task-assignor-class" 
id="id39">task.assignor.class</a></li>
               <li><a class="reference internal" href="#topology-optimization" 
id="id31">topology.optimization</a></li>
               <li><a class="reference internal" 
href="#windowed-inner-class-serde" id="id38">windowed.inner.class.serde</a></li>
+              <li><a class="reference internal" 
href="#log-summary-interval-ms" id="id40">log.summary.interval.ms</a></li>
             </ul>
           </li>
           <li><a class="reference internal" 
href="#kafka-consumers-and-producer-configuration-parameters" id="id16">Kafka 
consumers and producer configuration parameters</a>
@@ -470,6 +471,11 @@ 
streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);</code></pre>
             <td colspan="2">Sets window size for the deserializer in order to 
calculate window end times.</td>
             <td><code class="docutils literal"><span 
class="pre">null</span></code></td>
           </tr>
+          <tr class="row-even"><td>log.summary.interval.ms</td>
+            <td>Low</td>
+            <td colspan="2">Added to a windows maintainMs to ensure data is 
not deleted from the log prematurely. Allows for clock drift.</td>
+            <td>120000milliseconds  (2 minutes)</td>
+          </tr>
           </tbody>
         </table>
         <div class="section" id="acceptable-recovery-lag">
@@ -1066,6 +1072,16 @@ 
streamsConfig.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, CustomRocksD
           </p>
         </div></blockquote>
     </div>
+    <div class="section" id="log-summary-interval-ms">
+      <h4><a class="toc-backref" href="#id40">log.summary.interval.ms</a><a 
class="headerlink" href="#log-summary-interval-ms" title="Permalink to this 
headline"></a></h4>
+      <blockquote>
+        <div>
+          This configuration controls the output interval for summary 
information.
+          If greater or equal to 0, the summary log will be output according 
to the set time interval;
+          If less than 0, summary output is disabled.
+        </div>
+      </blockquote>
+    </div>
     <div class="section" id="upgrade-from">
       <span id="streams-developer-guide-upgrade-from"></span><h4><a 
class="toc-backref" href="#id14">upgrade.from</a><a class="headerlink" 
href="#upgrade-from" title="Permalink to this headline"></a></h4>
       <blockquote>
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 4c2c2d18a2e..eab567d525d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -834,6 +834,10 @@ public class StreamsConfig extends AbstractConfig {
     private static final String TASK_ASSIGNOR_CLASS_DOC = "A task assignor 
class or class name implementing the <code>" +
                                                           
TaskAssignor.class.getName() + "</code> interface. Defaults to the 
<code>HighAvailabilityTaskAssignor</code> class.";
 
+    public static final String LOG_SUMMARY_INTERVAL_MS_CONFIG = 
"log.summary.interval.ms";
+    private static final String LOG_SUMMARY_INTERVAL_MS_DOC = "This 
configuration controls the output interval for summary information.\n" +
+            "If greater or equal to 0, the summary log will be output 
according to the set time interval;\n" +
+            "If less than 0, summary output is disabled.";
     /**
      * {@code topology.optimization}
      * @deprecated since 2.7; use {@link #TOPOLOGY_OPTIMIZATION_CONFIG} instead
@@ -1206,7 +1210,12 @@ public class StreamsConfig extends AbstractConfig {
                     Type.LONG,
                     null,
                     Importance.LOW,
-                    WINDOW_SIZE_MS_DOC);
+                    WINDOW_SIZE_MS_DOC)
+            .define(LOG_SUMMARY_INTERVAL_MS_CONFIG,
+                    Type.LONG,
+                    2 * 60 * 1000L,
+                    Importance.LOW,
+                    LOG_SUMMARY_INTERVAL_MS_DOC);
     }
 
     // this is the list of configs for underlying clients
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 72e38821ff6..05c832811ad 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -303,7 +303,7 @@ public class StreamThread extends Thread implements 
ProcessingThread {
     private final Sensor commitRatioSensor;
     private final Sensor failedStreamThreadSensor;
 
-    private static final long LOG_SUMMARY_INTERVAL_MS = 2 * 60 * 1000L; // log 
a summary of processing every 2 minutes
+    private final long logSummaryIntervalMs; // the count summary log output 
time interval
     private long lastLogSummaryMs = -1L;
     private long totalRecordsProcessedSinceLastSummary = 0L;
     private long totalPunctuatorsSinceLastSummary = 0L;
@@ -643,6 +643,7 @@ public class StreamThread extends Thread implements 
ProcessingThread {
         this.processingMode = processingMode(config);
         this.stateUpdaterEnabled = 
InternalConfig.getStateUpdaterEnabled(config.originals());
         this.processingThreadsEnabled = 
InternalConfig.getProcessingThreadsEnabled(config.originals());
+        this.logSummaryIntervalMs = 
config.getLong(StreamsConfig.LOG_SUMMARY_INTERVAL_MS_CONFIG);
     }
 
     private static final class InternalConsumerConfig extends ConsumerConfig {
@@ -1069,8 +1070,7 @@ public class StreamThread extends Thread implements 
ProcessingThread {
         pollRatioSensor.record((double) pollLatency / runOnceLatency, now);
         commitRatioSensor.record((double) totalCommitLatency / runOnceLatency, 
now);
 
-        final boolean logProcessingSummary = now - lastLogSummaryMs > 
LOG_SUMMARY_INTERVAL_MS;
-        if (logProcessingSummary) {
+        if (logSummaryIntervalMs > 0 && now - lastLogSummaryMs > 
logSummaryIntervalMs) {
             log.info("Processed {} total records, ran {} punctuators, and 
committed {} total tasks since the last update",
                  totalRecordsProcessedSinceLastSummary, 
totalPunctuatorsSinceLastSummary, totalCommittedSinceLastSummary);
 
@@ -1142,8 +1142,7 @@ public class StreamThread extends Thread implements 
ProcessingThread {
         pollRatioSensor.record((double) pollLatency / runOnceLatency, now);
         commitRatioSensor.record((double) totalCommitLatency / runOnceLatency, 
now);
 
-        final boolean logProcessingSummary = now - lastLogSummaryMs > 
LOG_SUMMARY_INTERVAL_MS;
-        if (logProcessingSummary) {
+        if (logSummaryIntervalMs > 0 && now - lastLogSummaryMs > 
logSummaryIntervalMs) {
             log.info("Committed {} total tasks since the last update", 
totalCommittedSinceLastSummary);
 
             totalCommittedSinceLastSummary = 0L;

Reply via email to