This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f081f3b MINOR: Reset `streamTime` on clear (#8250)
f081f3b is described below
commit f081f3bd7619194e567daaa53b52216b75925adf
Author: Matthias J. Sax <[email protected]>
AuthorDate: Sat Mar 7 08:22:55 2020 -0800
MINOR: Reset `streamTime` on clear (#8250)
Reviewers: Guozhang Wang <[email protected]>
---
.../apache/kafka/streams/processor/internals/PartitionGroup.java | 7 +++----
.../org/apache/kafka/streams/processor/internals/RecordQueue.java | 5 ++---
.../kafka/streams/processor/internals/PartitionGroupTest.java | 2 +-
3 files changed, 6 insertions(+), 8 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
index 32610b5..4514d8d 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
@@ -220,15 +220,14 @@ public class PartitionGroup {
void close() {
clear();
-
- streamTime = RecordQueue.UNKNOWN;
}
void clear() {
- nonEmptyQueuesByTime.clear();
- totalBuffered = 0;
for (final RecordQueue queue : partitionQueues.values()) {
queue.clear();
}
+ nonEmptyQueuesByTime.clear();
+ totalBuffered = 0;
+ streamTime = RecordQueue.UNKNOWN;
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
index 0beade9..dff2f44 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
@@ -47,7 +47,7 @@ public class RecordQueue {
private final ArrayDeque<ConsumerRecord<byte[], byte[]>> fifoQueue;
private StampedRecord headRecord = null;
- private long partitionTime;
+ private long partitionTime = UNKNOWN;
private final Sensor droppedRecordsSensor;
@@ -74,7 +74,6 @@ public class RecordQueue {
droppedRecordsSensor
);
this.log = logContext.logger(RecordQueue.class);
- setPartitionTime(UNKNOWN);
}
void setPartitionTime(final long partitionTime) {
@@ -167,7 +166,7 @@ public class RecordQueue {
public void clear() {
fifoQueue.clear();
headRecord = null;
- setPartitionTime(UNKNOWN);
+ partitionTime = UNKNOWN;
}
private void updateHead() {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
index d38c52d..40e6451 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
@@ -392,7 +392,7 @@ public class PartitionGroupTest {
group.clear();
assertThat(group.numBuffered(), equalTo(0));
- assertThat(group.streamTime(), equalTo(3L));
+ assertThat(group.streamTime(), equalTo(RecordQueue.UNKNOWN));
assertThat(group.nextRecord(new PartitionGroup.RecordInfo()),
equalTo(null));
assertThat(group.partitionTimestamp(partition1),
equalTo(RecordQueue.UNKNOWN));