This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7567cbc KAFKA-13476: Increase resilience timestamp decoding Kafka
Streams (#11535)
7567cbc is described below
commit 7567cbc857eef4f535410a8f6256308197c3b9c8
Author: Richard <[email protected]>
AuthorDate: Thu Jan 6 06:38:10 2022 +0100
KAFKA-13476: Increase resilience timestamp decoding Kafka Streams (#11535)
Reviewers: Matthias J. Sax <[email protected]>
---
.../streams/processor/internals/StreamTask.java | 23 +++++++++++++---------
.../processor/internals/StreamTaskTest.java | 7 +++++++
2 files changed, 21 insertions(+), 9 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 32369c9..6823d2e 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -1097,15 +1097,20 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
if (encryptedString.isEmpty()) {
return RecordQueue.UNKNOWN;
}
- final ByteBuffer buffer =
ByteBuffer.wrap(Base64.getDecoder().decode(encryptedString));
- final byte version = buffer.get();
- switch (version) {
- case LATEST_MAGIC_BYTE:
- return buffer.getLong();
- default:
- log.warn("Unsupported offset metadata version found. Supported
version {}. Found version {}.",
- LATEST_MAGIC_BYTE, version);
- return RecordQueue.UNKNOWN;
+ try {
+ final ByteBuffer buffer =
ByteBuffer.wrap(Base64.getDecoder().decode(encryptedString));
+ final byte version = buffer.get();
+ switch (version) {
+ case LATEST_MAGIC_BYTE:
+ return buffer.getLong();
+ default:
+ log.warn("Unsupported offset metadata version found.
Supported version {}. Found version {}.",
+ LATEST_MAGIC_BYTE, version);
+ return RecordQueue.UNKNOWN;
+ }
+ } catch (final Exception exception) {
+ log.warn("Unsupported offset metadata found");
+ return RecordQueue.UNKNOWN;
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 1b3c500..3c1814e 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -1184,6 +1184,13 @@ public class StreamTaskTest {
}
@Test
+ public void shouldReturnUnknownTimestampIfInvalidMetadata() {
+ task = createStatelessTask(createConfig("100"));
+ final String invalidBase64String = "{}";
+ assertEquals(RecordQueue.UNKNOWN,
task.decodeTimestamp(invalidBase64String));
+ }
+
+ @Test
public void shouldBeProcessableIfAllPartitionsBuffered() {
task = createStatelessTask(createConfig("100"));
task.initializeIfNeeded();