This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7efb58f3211 KAFKA-16584 Make log processing summary configurable or
debug (#16509)
7efb58f3211 is described below
commit 7efb58f3211cdc3bd0524a79f9b5ce4ef74a77b4
Author: dujian0068 <[email protected]>
AuthorDate: Wed Jul 24 04:09:25 2024 +0800
KAFKA-16584 Make log processing summary configurable or debug (#16509)
KAFKA-16584 Make log processing summary configurable or debug
Reviewers: Matthias Sax <[email protected]>, Bill Bejeck <[email protected]>
---
docs/streams/developer-guide/config-streams.html | 16 ++++++++++++++++
.../java/org/apache/kafka/streams/StreamsConfig.java | 11 ++++++++++-
.../kafka/streams/processor/internals/StreamThread.java | 9 ++++-----
3 files changed, 30 insertions(+), 6 deletions(-)
diff --git a/docs/streams/developer-guide/config-streams.html
b/docs/streams/developer-guide/config-streams.html
index 92a44e0be8c..f50945778a1 100644
--- a/docs/streams/developer-guide/config-streams.html
+++ b/docs/streams/developer-guide/config-streams.html
@@ -93,6 +93,7 @@ settings.put(... , ...);</code></pre>
<li><a class="reference internal" href="#task-assignor-class"
id="id39">task.assignor.class</a></li>
<li><a class="reference internal" href="#topology-optimization"
id="id31">topology.optimization</a></li>
<li><a class="reference internal"
href="#windowed-inner-class-serde" id="id38">windowed.inner.class.serde</a></li>
+ <li><a class="reference internal"
href="#log-summary-interval-ms" id="id40">log.summary.interval.ms</a></li>
</ul>
</li>
<li><a class="reference internal"
href="#kafka-consumers-and-producer-configuration-parameters" id="id16">Kafka
consumers and producer configuration parameters</a>
@@ -470,6 +471,11 @@
streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);</code></pre>
<td colspan="2">Sets window size for the deserializer in order to
calculate window end times.</td>
<td><code class="docutils literal"><span
class="pre">null</span></code></td>
</tr>
+ <tr class="row-even"><td>log.summary.interval.ms</td>
+ <td>Low</td>
+ <td colspan="2">Added to a windows maintainMs to ensure data is
not deleted from the log prematurely. Allows for clock drift.</td>
+ <td>120000milliseconds (2 minutes)</td>
+ </tr>
</tbody>
</table>
<div class="section" id="acceptable-recovery-lag">
@@ -1066,6 +1072,16 @@
streamsConfig.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, CustomRocksD
</p>
</div></blockquote>
</div>
+ <div class="section" id="log-summary-interval-ms">
+ <h4><a class="toc-backref" href="#id40">log.summary.interval.ms</a><a
class="headerlink" href="#log-summary-interval-ms" title="Permalink to this
headline"></a></h4>
+ <blockquote>
+ <div>
+ This configuration controls the output interval for summary
information.
+ If greater or equal to 0, the summary log will be output according
to the set time interval;
+ If less than 0, summary output is disabled.
+ </div>
+ </blockquote>
+ </div>
<div class="section" id="upgrade-from">
<span id="streams-developer-guide-upgrade-from"></span><h4><a
class="toc-backref" href="#id14">upgrade.from</a><a class="headerlink"
href="#upgrade-from" title="Permalink to this headline"></a></h4>
<blockquote>
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 4c2c2d18a2e..eab567d525d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -834,6 +834,10 @@ public class StreamsConfig extends AbstractConfig {
private static final String TASK_ASSIGNOR_CLASS_DOC = "A task assignor
class or class name implementing the <code>" +
TaskAssignor.class.getName() + "</code> interface. Defaults to the
<code>HighAvailabilityTaskAssignor</code> class.";
+ public static final String LOG_SUMMARY_INTERVAL_MS_CONFIG =
"log.summary.interval.ms";
+ private static final String LOG_SUMMARY_INTERVAL_MS_DOC = "This
configuration controls the output interval for summary information.\n" +
+ "If greater or equal to 0, the summary log will be output
according to the set time interval;\n" +
+ "If less than 0, summary output is disabled.";
/**
* {@code topology.optimization}
* @deprecated since 2.7; use {@link #TOPOLOGY_OPTIMIZATION_CONFIG} instead
@@ -1206,7 +1210,12 @@ public class StreamsConfig extends AbstractConfig {
Type.LONG,
null,
Importance.LOW,
- WINDOW_SIZE_MS_DOC);
+ WINDOW_SIZE_MS_DOC)
+ .define(LOG_SUMMARY_INTERVAL_MS_CONFIG,
+ Type.LONG,
+ 2 * 60 * 1000L,
+ Importance.LOW,
+ LOG_SUMMARY_INTERVAL_MS_DOC);
}
// this is the list of configs for underlying clients
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 72e38821ff6..05c832811ad 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -303,7 +303,7 @@ public class StreamThread extends Thread implements
ProcessingThread {
private final Sensor commitRatioSensor;
private final Sensor failedStreamThreadSensor;
- private static final long LOG_SUMMARY_INTERVAL_MS = 2 * 60 * 1000L; // log
a summary of processing every 2 minutes
+ private final long logSummaryIntervalMs; // the count summary log output
time interval
private long lastLogSummaryMs = -1L;
private long totalRecordsProcessedSinceLastSummary = 0L;
private long totalPunctuatorsSinceLastSummary = 0L;
@@ -643,6 +643,7 @@ public class StreamThread extends Thread implements
ProcessingThread {
this.processingMode = processingMode(config);
this.stateUpdaterEnabled =
InternalConfig.getStateUpdaterEnabled(config.originals());
this.processingThreadsEnabled =
InternalConfig.getProcessingThreadsEnabled(config.originals());
+ this.logSummaryIntervalMs =
config.getLong(StreamsConfig.LOG_SUMMARY_INTERVAL_MS_CONFIG);
}
private static final class InternalConsumerConfig extends ConsumerConfig {
@@ -1069,8 +1070,7 @@ public class StreamThread extends Thread implements
ProcessingThread {
pollRatioSensor.record((double) pollLatency / runOnceLatency, now);
commitRatioSensor.record((double) totalCommitLatency / runOnceLatency,
now);
- final boolean logProcessingSummary = now - lastLogSummaryMs >
LOG_SUMMARY_INTERVAL_MS;
- if (logProcessingSummary) {
+ if (logSummaryIntervalMs > 0 && now - lastLogSummaryMs >
logSummaryIntervalMs) {
log.info("Processed {} total records, ran {} punctuators, and
committed {} total tasks since the last update",
totalRecordsProcessedSinceLastSummary,
totalPunctuatorsSinceLastSummary, totalCommittedSinceLastSummary);
@@ -1142,8 +1142,7 @@ public class StreamThread extends Thread implements
ProcessingThread {
pollRatioSensor.record((double) pollLatency / runOnceLatency, now);
commitRatioSensor.record((double) totalCommitLatency / runOnceLatency,
now);
- final boolean logProcessingSummary = now - lastLogSummaryMs >
LOG_SUMMARY_INTERVAL_MS;
- if (logProcessingSummary) {
+ if (logSummaryIntervalMs > 0 && now - lastLogSummaryMs >
logSummaryIntervalMs) {
log.info("Committed {} total tasks since the last update",
totalCommittedSinceLastSummary);
totalCommittedSinceLastSummary = 0L;