This is an automated email from the ASF dual-hosted git repository.
cegerton pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.1 by this push:
new 19c31ec0eea KAFKA-14809 Fix logging conditional on WorkerSourceTask
(#13386)
19c31ec0eea is described below
commit 19c31ec0eea6c0a9f6c1fe807dbb5afc34de6825
Author: Hector Geraldino <[email protected]>
AuthorDate: Thu Mar 16 08:39:31 2023 -0400
KAFKA-14809 Fix logging conditional on WorkerSourceTask (#13386)
Reviewers: Chris Egerton <[email protected]>
---
.../kafka/connect/runtime/WorkerSourceTask.java | 18 +++++++++---------
metadata/.jqwik-database | Bin 0 -> 4 bytes
2 files changed, 9 insertions(+), 9 deletions(-)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index ed366763a36..76d674d0fe7 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -480,24 +480,24 @@ class WorkerSourceTask extends WorkerTask {
this.committableOffsets = CommittableOffsets.EMPTY;
}
- if (committableOffsets.isEmpty()) {
- log.info("{} Either no records were produced by the task since the
last offset commit, "
- + "or every record has been filtered out by a
transformation "
+ if (offsetsToCommit.isEmpty()) {
+ log.debug("{} Either no records were produced by the task since
the last offset commit, "
+ + "or every record has been filtered out by a
transformation "
+ "or dropped due to transformation or conversion errors.",
this
);
// We continue with the offset commit process here instead of
simply returning immediately
// in order to invoke SourceTask::commit and record metrics for a
successful offset commit
} else {
- log.info("{} Committing offsets for {} acknowledged messages",
this, committableOffsets.numCommittableMessages());
- if (committableOffsets.hasPending()) {
+ log.info("{} Committing offsets for {} acknowledged messages",
this, offsetsToCommit.numCommittableMessages());
+ if (offsetsToCommit.hasPending()) {
log.debug("{} There are currently {} pending messages spread
across {} source partitions whose offsets will not be committed. "
+ "The source partition with the most pending
messages is {}, with {} pending messages",
this,
- committableOffsets.numUncommittableMessages(),
- committableOffsets.numDeques(),
- committableOffsets.largestDequePartition(),
- committableOffsets.largestDequeSize()
+ offsetsToCommit.numUncommittableMessages(),
+ offsetsToCommit.numDeques(),
+ offsetsToCommit.largestDequePartition(),
+ offsetsToCommit.largestDequeSize()
);
} else {
log.debug("{} There are currently no pending messages for this
offset commit; "
diff --git a/metadata/.jqwik-database b/metadata/.jqwik-database
new file mode 100644
index 00000000000..711006c3d3b
Binary files /dev/null and b/metadata/.jqwik-database differ