This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 953ec98 MINOR: Improve Kafka Streams JavaDocs with regard to record
metadata (#10810)
953ec98 is described below
commit 953ec9810099d6e5f41541de46c0ceebf4372790
Author: Matthias J. Sax <[email protected]>
AuthorDate: Wed Jun 9 22:51:36 2021 -0700
MINOR: Improve Kafka Streams JavaDocs with regard to record metadata
(#10810)
Reviewers: Luke Chen <[email protected]>, Josep Prat <[email protected]>,
John Roesler <[email protected]>
---
.../kafka/streams/processor/ProcessorContext.java | 96 +++++++++++++++-------
.../apache/kafka/streams/processor/Punctuator.java | 11 +++
.../kafka/streams/processor/RecordContext.java | 83 +++++++++++++++----
.../streams/processor/api/ProcessorContext.java | 26 +++---
.../streams/processor/api/RecordMetadata.java | 41 ++++++++-
.../internals/ProcessorRecordContext.java | 77 ++++++++---------
.../streams/state/internals/LRUCacheEntry.java | 3 +-
...KStreamSessionWindowAggregateProcessorTest.java | 23 +++---
.../internals/KTableKTableInnerJoinTest.java | 3 +-
.../internals/KTableKTableLeftJoinTest.java | 3 +-
.../internals/KTableKTableOuterJoinTest.java | 3 +-
.../internals/KTableKTableRightJoinTest.java | 3 +-
.../internals/KTableTransformValuesTest.java | 2 +-
.../KTableSuppressProcessorMetricsTest.java | 7 +-
.../suppress/KTableSuppressProcessorTest.java | 47 +++++------
.../internals/AbstractProcessorContextTest.java | 2 +-
.../internals/ProcessorRecordContextTest.java | 21 ++++-
.../internals/AbstractWindowBytesStoreTest.java | 3 +-
.../streams/state/internals/BufferValueTest.java | 21 ++---
.../CachingInMemoryKeyValueStoreTest.java | 5 +-
.../internals/CachingInMemorySessionStoreTest.java | 7 +-
.../CachingPersistentSessionStoreTest.java | 5 +-
.../CachingPersistentWindowStoreTest.java | 5 +-
.../streams/state/internals/NamedCacheTest.java | 65 +++++----------
.../streams/state/internals/ThreadCacheTest.java | 27 +++---
.../internals/TimeOrderedKeyValueBufferTest.java | 14 ++--
.../streams/processor/MockProcessorContext.java | 12 +++
.../kafka/streams/MockProcessorContextTest.java | 3 +-
28 files changed, 385 insertions(+), 233 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index aa57463..a598e72 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -26,6 +26,7 @@ import org.apache.kafka.streams.errors.StreamsException;
import java.io.File;
import java.time.Duration;
import java.util.Map;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
/**
* Processor context interface.
@@ -33,49 +34,49 @@ import java.util.Map;
public interface ProcessorContext {
/**
- * Returns the application id.
+ * Return the application id.
*
* @return the application id
*/
String applicationId();
/**
- * Returns the task id.
+ * Return the task id.
*
* @return the task id
*/
TaskId taskId();
/**
- * Returns the default key serde.
+ * Return the default key serde.
*
* @return the key serializer
*/
Serde<?> keySerde();
/**
- * Returns the default value serde.
+ * Return the default value serde.
*
* @return the value serializer
*/
Serde<?> valueSerde();
/**
- * Returns the state directory for the partition.
+ * Return the state directory for the partition.
*
* @return the state directory
*/
File stateDir();
/**
- * Returns Metrics instance.
+ * Return Metrics instance.
*
* @return StreamsMetrics
*/
StreamsMetrics metrics();
/**
- * Registers and possibly restores the specified storage engine.
+ * Register and possibly restores the specified storage engine.
*
* @param store the storage engine
* @param stateRestoreCallback the restoration callback logic for
log-backed state stores upon restart
@@ -98,7 +99,7 @@ public interface ProcessorContext {
<S extends StateStore> S getStateStore(final String name);
/**
- * Schedules a periodic operation for processors. A processor may call
this method during
+ * Schedule a periodic operation for processors. A processor may call this
method during
* {@link Processor#init(ProcessorContext) initialization} or
* {@link Processor#process(Object, Object) processing} to
* schedule a periodic callback — called a punctuation — to
{@link Punctuator#punctuate(long)}.
@@ -134,18 +135,24 @@ public interface ProcessorContext {
final Punctuator callback);
/**
- * Forwards a key/value pair to all downstream processors.
+ * Forward a key/value pair to all downstream processors.
* Used the input record's timestamp as timestamp for the output record.
*
+ * <p> If this method is called with {@link Punctuator#punctuate(long)}
the record that
+ * is sent downstream won't have any associated record metadata like
topic, partition, or offset.
+ *
* @param key key
* @param value value
*/
<K, V> void forward(final K key, final V value);
/**
- * Forwards a key/value pair to the specified downstream processors.
+ * Forward a key/value pair to the specified downstream processors.
* Can be used to set the timestamp of the output record.
*
+ * <p> If this method is called with {@link Punctuator#punctuate(long)}
the record that
+ * is sent downstream won't have any associated record metadata like
topic, partition, or offset.
+ *
* @param key key
* @param value value
* @param to the options to use when forwarding
@@ -153,48 +160,84 @@ public interface ProcessorContext {
<K, V> void forward(final K key, final V value, final To to);
/**
- * Requests a commit.
+ * Request a commit.
*/
void commit();
/**
- * Returns the topic name of the current input record; could be null if it
is not
- * available (for example, if this method is invoked from the punctuate
call).
+ * Return the topic name of the current input record; could be {@code
null} if it is not
+ * available.
+ *
+ * <p> For example, if this method is invoked within a {@link
Punctuator#punctuate(long)
+ * punctuation callback}, or while processing a record that was forwarded
by a punctuation
+ * callback, the record won't have an associated topic.
+ * Another example is
+ * {@link
org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier,
String...)}
+ * (and siblings), that do not always guarantee to provide a valid topic
name, as they might be
+ * executed "out-of-band" due to some internal optimizations applied by
the Kafka Streams DSL.
*
* @return the topic name
*/
String topic();
/**
- * Returns the partition id of the current input record; could be -1 if it
is not
- * available (for example, if this method is invoked from the punctuate
call).
+ * Return the partition id of the current input record; could be {@code
-1} if it is not
+ * available.
+ *
+ * <p> For example, if this method is invoked within a {@link
Punctuator#punctuate(long)
+ * punctuation callback}, or while processing a record that was forwarded
by a punctuation
+ * callback, the record won't have an associated partition id.
+ * Another example is
+ * {@link
org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier,
String...)}
+ * (and siblings), that do not always guarantee to provide a valid
partition id, as they might be
+ * executed "out-of-band" due to some internal optimizations applied by
the Kafka Streams DSL.
*
* @return the partition id
*/
int partition();
/**
- * Returns the offset of the current input record; could be -1 if it is not
- * available (for example, if this method is invoked from the punctuate
call).
+ * Return the offset of the current input record; could be {@code -1} if
it is not
+ * available.
+ *
+ * <p> For example, if this method is invoked within a {@link
Punctuator#punctuate(long)
+ * punctuation callback}, or while processing a record that was forwarded
by a punctuation
+ * callback, the record won't have an associated offset.
+ * Another example is
+ * {@link
org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier,
String...)}
+ * (and siblings), that do not always guarantee to provide a valid offset,
as they might be
+ * executed "out-of-band" due to some internal optimizations applied by
the Kafka Streams DSL.
*
* @return the offset
*/
long offset();
/**
- * Returns the headers of the current input record; could be null if it is
not
- * available (for example, if this method is invoked from the punctuate
call).
+ * Return the headers of the current input record; could be an empty
header if it is not
+ * available.
+ *
+ * <p> For example, if this method is invoked within a {@link
Punctuator#punctuate(long)
+ * punctuation callback}, or while processing a record that was forwarded
by a punctuation
+ * callback, the record might not have any associated headers.
+ * Another example is
+ * {@link
org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier,
String...)}
+ * (and siblings), that do not always guarantee to provide valid headers,
as they might be
+ * executed "out-of-band" due to some internal optimizations applied by
the Kafka Streams DSL.
*
* @return the headers
*/
Headers headers();
/**
- * Returns the current timestamp.
+ * Return the current timestamp.
*
* <p> If it is triggered while processing a record streamed from the
source processor,
* timestamp is defined as the timestamp of the current input record; the
timestamp is extracted from
* {@link org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord}
by {@link TimestampExtractor}.
+ * Note, that an upstream {@link Processor} might have set a new timestamp
by calling
+ * {@link ProcessorContext#forward(Object, Object, To) forward(...,
To.all().withTimestamp(...))}.
+ * In particular, some Kafka Streams DSL operators set result record
timestamps explicitly,
+ * to guarantee deterministic results.
*
* <p> If it is triggered while processing a record generated not from the
source processor (for example,
* if this method is invoked from the punctuate call), timestamp is
defined as the current
@@ -205,7 +248,7 @@ public interface ProcessorContext {
long timestamp();
/**
- * Returns all the application config properties as key/value pairs.
+ * Return all the application config properties as key/value pairs.
*
* <p> The config properties are defined in the {@link
org.apache.kafka.streams.StreamsConfig}
* object and associated to the ProcessorContext.
@@ -220,7 +263,7 @@ public interface ProcessorContext {
Map<String, Object> appConfigs();
/**
- * Returns all the application config properties with the given key
prefix, as key/value pairs
+ * Return all the application config properties with the given key prefix,
as key/value pairs
* stripping the prefix.
*
* <p> The config properties are defined in the {@link
org.apache.kafka.streams.StreamsConfig}
@@ -234,8 +277,7 @@ public interface ProcessorContext {
/**
* Return the current system timestamp (also called wall-clock time) in
milliseconds.
*
- * <p>
- * Note: this method returns the internally cached system timestamp from
the Kafka Stream runtime.
+ * <p> Note: this method returns the internally cached system timestamp
from the Kafka Stream runtime.
* Thus, it may return a different value compared to {@code
System.currentTimeMillis()}.
*
* @return the current system timestamp in milliseconds
@@ -245,13 +287,11 @@ public interface ProcessorContext {
/**
* Return the current stream-time in milliseconds.
*
- * <p>
- * Stream-time is the maximum observed {@link TimestampExtractor record
timestamp} so far
+ * <p> Stream-time is the maximum observed {@link TimestampExtractor
record timestamp} so far
* (including the currently processed record), i.e., it can be considered
a high-watermark.
* Stream-time is tracked on a per-task basis and is preserved across
restarts and during task migration.
- * <p>
*
- * Note: this method is not supported for global processors (cf. {@link
Topology#addGlobalStore} (...)
+ * <p> Note: this method is not supported for global processors (cf.
{@link Topology#addGlobalStore} (...)
* and {@link StreamsBuilder#addGlobalStore} (...),
* because there is no concept of stream-time for this case.
* Calling this method in a global processor will result in an {@link
UnsupportedOperationException}.
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/Punctuator.java
b/streams/src/main/java/org/apache/kafka/streams/processor/Punctuator.java
index 1886dad..1cbde6d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/Punctuator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/Punctuator.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.processor;
import java.time.Duration;
+import org.apache.kafka.streams.processor.api.Record;
/**
* A functional interface used as an argument to {@link
ProcessorContext#schedule(Duration, PunctuationType, Punctuator)}.
@@ -28,6 +29,16 @@ public interface Punctuator {
/**
* Perform the scheduled periodic operation.
*
+ * <p> If this method accesses {@link ProcessorContext} or
+ * {@link org.apache.kafka.streams.processor.api.ProcessorContext}, record
metadata like topic,
+ * partition, and offset or {@link
org.apache.kafka.streams.processor.api.RecordMetadata} won't
+ * be available.
+ *
+ * <p> Furthermore, for any record that is sent downstream via {@link
ProcessorContext#forward(Object, Object)}
+ * or {@link
org.apache.kafka.streams.processor.api.ProcessorContext#forward(Record)}, there
+ * won't be any record metadata. If {@link
ProcessorContext#forward(Object, Object)} is used,
+ * it's also not possible to set records headers.
+ *
* @param timestamp when the operation is being called, depending on
{@link PunctuationType}
*/
void punctuate(long timestamp);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/RecordContext.java
b/streams/src/main/java/org/apache/kafka/streams/processor/RecordContext.java
index 5819a46..f0b8ff1 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/RecordContext.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/RecordContext.java
@@ -17,39 +17,94 @@
package org.apache.kafka.streams.processor;
import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
/**
* The context associated with the current record being processed by
* an {@link Processor}
*/
public interface RecordContext {
+
/**
- * @return The offset of the original record received from Kafka;
- * could be -1 if it is not available
+ * Return the topic name of the current input record; could be {@code
null} if it is not
+ * available.
+ *
+ * <p> For example, if this method is invoked within a {@link
Punctuator#punctuate(long)
+ * punctuation callback}, or while processing a record that was forwarded
by a punctuation
+ * callback, the record won't have an associated topic.
+ * Another example is
+ * {@link
org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier,
String...)}
+ * (and siblings), that do not always guarantee to provide a valid topic
name, as they might be
+ * executed "out-of-band" due to some internal optimizations applied by
the Kafka Streams DSL.
+ *
+ * @return the topic name
*/
- long offset();
+ String topic();
/**
- * @return The timestamp extracted from the record received from Kafka;
- * could be -1 if it is not available
+ * Return the partition id of the current input record; could be {@code
-1} if it is not
+ * available.
+ *
+ * <p> For example, if this method is invoked within a {@link
Punctuator#punctuate(long)
+ * punctuation callback}, or while processing a record that was forwarded
by a punctuation
+ * callback, the record won't have an associated partition id.
+ * Another example is
+ * {@link
org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier,
String...)}
+ * (and siblings), that do not always guarantee to provide a valid
partition id, as they might be
+ * executed "out-of-band" due to some internal optimizations applied by
the Kafka Streams DSL.
+ *
+ * @return the partition id
*/
- long timestamp();
+ int partition();
/**
- * @return The topic the record was received on;
- * could be null if it is not available
+ * Return the offset of the current input record; could be {@code -1} if
it is not
+ * available.
+ *
+ * <p> For example, if this method is invoked within a {@link
Punctuator#punctuate(long)
+ * punctuation callback}, or while processing a record that was forwarded
by a punctuation
+ * callback, the record won't have an associated offset.
+ * Another example is
+ * {@link
org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier,
String...)}
+ * (and siblings), that do not always guarantee to provide a valid offset,
as they might be
+ * executed "out-of-band" due to some internal optimizations applied by
the Kafka Streams DSL.
+ *
+ * @return the offset
*/
- String topic();
+ long offset();
/**
- * @return The partition the record was received on;
- * could be -1 if it is not available
+ * Return the current timestamp.
+ *
+ * <p> If it is triggered while processing a record streamed from the
source processor,
+ * timestamp is defined as the timestamp of the current input record; the
timestamp is extracted from
+ * {@link org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord}
by {@link TimestampExtractor}.
+ * Note, that an upstream {@link Processor} might have set a new timestamp
by calling
+ * {@link ProcessorContext#forward(Object, Object, To) forward(...,
To.all().withTimestamp(...))}.
+ * In particular, some Kafka Streams DSL operators set result record
timestamps explicitly,
+ * to guarantee deterministic results.
+ *
+ * <p> If it is triggered while processing a record generated not from the
source processor (for example,
+ * if this method is invoked from the punctuate call), timestamp is
defined as the current
+ * task's stream time, which is defined as the largest timestamp of any
record processed by the task.
+ *
+ * @return the timestamp
*/
- int partition();
+ long timestamp();
/**
- * @return The headers from the record received from Kafka;
- * could be null if it is not available
+ * Return the headers of the current input record; could be an empty
header if it is not
+ * available.
+ *
+ * <p> For example, if this method is invoked within a {@link
Punctuator#punctuate(long)
+ * punctuation callback}, or while processing a record that was forwarded
by a punctuation
+ * callback, the record might not have any associated headers.
+ * Another example is
+ * {@link
org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier,
String...)}
+ * (and siblings), that do not always guarantee to provide a valid
headers, as they might be
+ * executed "out-of-band" due to some internal optimizations applied by
the Kafka Streams DSL.
+ *
+ * @return the headers
*/
Headers headers();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
b/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
index c591d51..d110a76 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
@@ -39,23 +39,23 @@ import java.util.Optional;
public interface ProcessorContext<KForward, VForward> {
/**
- * Returns the application id.
+ * Return the application id.
*
* @return the application id
*/
String applicationId();
/**
- * Returns the task id.
+ * Return the task id.
*
* @return the task id
*/
TaskId taskId();
/**
- * The metadata of the source record, if is one. Processors may be invoked
to
+ * Return the metadata of the current record if available. Processors may
be invoked to
* process a source record from an input topic, to run a scheduled
punctuation
- * (see {@link ProcessorContext#schedule(Duration, PunctuationType,
Punctuator)} ),
+ * (see {@link ProcessorContext#schedule(Duration, PunctuationType,
Punctuator)}),
* or because a parent processor called {@link
ProcessorContext#forward(Record)}.
* <p>
* In the case of a punctuation, there is no source record, so this
metadata would be
@@ -74,28 +74,28 @@ public interface ProcessorContext<KForward, VForward> {
Optional<RecordMetadata> recordMetadata();
/**
- * Returns the default key serde.
+ * Return the default key serde.
*
* @return the key serializer
*/
Serde<?> keySerde();
/**
- * Returns the default value serde.
+ * Return the default value serde.
*
* @return the value serializer
*/
Serde<?> valueSerde();
/**
- * Returns the state directory for the partition.
+ * Return the state directory for the partition.
*
* @return the state directory
*/
File stateDir();
/**
- * Returns Metrics instance.
+ * Return Metrics instance.
*
* @return StreamsMetrics
*/
@@ -113,7 +113,7 @@ public interface ProcessorContext<KForward, VForward> {
<S extends StateStore> S getStateStore(final String name);
/**
- * Schedules a periodic operation for processors. A processor may call
this method during
+ * Schedule a periodic operation for processors. A processor may call this
method during
* {@link Processor#init(ProcessorContext) initialization} or
* {@link Processor#process(Record)} processing} to
* schedule a periodic callback — called a punctuation — to
{@link Punctuator#punctuate(long)}.
@@ -149,7 +149,7 @@ public interface ProcessorContext<KForward, VForward> {
final Punctuator callback);
/**
- * Forwards a record to all child processors.
+ * Forward a record to all child processors.
* <p>
* Note that the forwarded {@link Record} is shared between the parent and
child
* processors. And of course, the parent may forward the same object to
multiple children,
@@ -207,7 +207,7 @@ public interface ProcessorContext<KForward, VForward> {
<K extends KForward, V extends VForward> void forward(Record<K, V> record);
/**
- * Forwards a record to the specified child processor.
+ * Forward a record to the specified child processor.
* See {@link ProcessorContext#forward(Record)} for considerations.
*
* @param record The record to forward
@@ -217,7 +217,7 @@ public interface ProcessorContext<KForward, VForward> {
<K extends KForward, V extends VForward> void forward(Record<K, V> record,
final String childName);
/**
- * Requests a commit.
+ * Request a commit.
*/
void commit();
@@ -237,7 +237,7 @@ public interface ProcessorContext<KForward, VForward> {
Map<String, Object> appConfigs();
/**
- * Returns all the application config properties with the given key
prefix, as key/value pairs
+ * Return all the application config properties with the given key prefix,
as key/value pairs
* stripping the prefix.
*
* <p> The config properties are defined in the {@link
org.apache.kafka.streams.StreamsConfig}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/api/RecordMetadata.java
b/streams/src/main/java/org/apache/kafka/streams/processor/api/RecordMetadata.java
index 532104a..ab88b89 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/api/RecordMetadata.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/api/RecordMetadata.java
@@ -16,19 +16,54 @@
*/
package org.apache.kafka.streams.processor.api;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
+
public interface RecordMetadata {
/**
- * @return The topic of the original record received from Kafka
+ * Return the topic name of the current input record; could be {@code
null} if it is not
+ * available.
+ *
+ * <p> For example, if this method is invoked within a @link
Punctuator#punctuate(long)
+ * punctuation callback}, or while processing a record that was forwarded
by a punctuation
+ * callback, the record won't have an associated topic.
+ * Another example is
+ * {@link
org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier,
String...)}
+ * (and siblings), that do not always guarantee to provide a valid topic
name, as they might be
+ * executed "out-of-band" due to some internal optimizations applied by
the Kafka Streams DSL.
+ *
+ * @return the topic name
*/
String topic();
/**
- * @return The partition of the original record received from Kafka
+ * Return the partition id of the current input record; could be {@code
-1} if it is not
+ * available.
+ *
+ * <p> For example, if this method is invoked within a @link
Punctuator#punctuate(long)
+ * punctuation callback}, or while processing a record that was forwarded
by a punctuation
+ * callback, the record won't have an associated partition id.
+ * Another example is
+ * {@link
org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier,
String...)}
+ * (and siblings), that do not always guarantee to provide a valid
partition id, as they might be
+ * executed "out-of-band" due to some internal optimizations applied by
the Kafka Streams DSL.
+ *
+ * @return the partition id
*/
int partition();
/**
- * @return The offset of the original record received from Kafka
+ * Return the offset of the current input record; could be {@code -1} if
it is not
+ * available.
+ *
+ * <p> For example, if this method is invoked within a @link
Punctuator#punctuate(long)
+ * punctuation callback}, or while processing a record that was forwarded
by a punctuation
+ * callback, the record won't have an associated offset.
+ * Another example is
+ * {@link
org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier,
String...)}
+ * (and siblings), that do not always guarantee to provide a valid offset,
as they might be
+ * executed "out-of-band" due to some internal optimizations applied by
the Kafka Streams DSL.
+ *
+ * @return the offset
*/
long offset();
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
index 7eb4dc8..07e9ab3 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
@@ -43,12 +43,11 @@ public class ProcessorRecordContext implements
RecordContext, RecordMetadata {
final int partition,
final String topic,
final Headers headers) {
-
this.timestamp = timestamp;
this.offset = offset;
this.topic = topic;
this.partition = partition;
- this.headers = headers;
+ this.headers = Objects.requireNonNull(headers);
}
@Override
@@ -84,13 +83,11 @@ public class ProcessorRecordContext implements
RecordContext, RecordMetadata {
size += topic.toCharArray().length;
}
size += Integer.BYTES; // partition
- if (headers != null) {
- for (final Header header : headers) {
- size += header.key().toCharArray().length;
- final byte[] value = header.value();
- if (value != null) {
- size += value.length;
- }
+ for (final Header header : headers) {
+ size += header.key().toCharArray().length;
+ final byte[] value = header.value();
+ if (value != null) {
+ size += value.length;
}
}
return size;
@@ -109,26 +106,22 @@ public class ProcessorRecordContext implements
RecordContext, RecordMetadata {
size += Integer.BYTES; // partition
size += Integer.BYTES; // number of headers
- if (headers == null) {
- headerKeysBytes = headerValuesBytes = null;
- } else {
- final Header[] headers = this.headers.toArray();
- headerKeysBytes = new byte[headers.length][];
- headerValuesBytes = new byte[headers.length][];
-
- for (int i = 0; i < headers.length; i++) {
- size += 2 * Integer.BYTES; // sizes of key and value
-
- final byte[] keyBytes = headers[i].key().getBytes(UTF_8);
- size += keyBytes.length;
- final byte[] valueBytes = headers[i].value();
- if (valueBytes != null) {
- size += valueBytes.length;
- }
-
- headerKeysBytes[i] = keyBytes;
- headerValuesBytes[i] = valueBytes;
+ final Header[] headers = this.headers.toArray();
+ headerKeysBytes = new byte[headers.length][];
+ headerValuesBytes = new byte[headers.length][];
+
+ for (int i = 0; i < headers.length; i++) {
+ size += 2 * Integer.BYTES; // sizes of key and value
+
+ final byte[] keyBytes = headers[i].key().getBytes(UTF_8);
+ size += keyBytes.length;
+ final byte[] valueBytes = headers[i].value();
+ if (valueBytes != null) {
+ size += valueBytes.length;
}
+
+ headerKeysBytes[i] = keyBytes;
+ headerValuesBytes[i] = valueBytes;
}
final ByteBuffer buffer = ByteBuffer.allocate(size);
@@ -140,20 +133,16 @@ public class ProcessorRecordContext implements
RecordContext, RecordMetadata {
buffer.put(topicBytes);
buffer.putInt(partition);
- if (headers == null) {
- buffer.putInt(-1);
- } else {
- buffer.putInt(headerKeysBytes.length);
- for (int i = 0; i < headerKeysBytes.length; i++) {
- buffer.putInt(headerKeysBytes[i].length);
- buffer.put(headerKeysBytes[i]);
-
- if (headerValuesBytes[i] != null) {
- buffer.putInt(headerValuesBytes[i].length);
- buffer.put(headerValuesBytes[i]);
- } else {
- buffer.putInt(-1);
- }
+ buffer.putInt(headerKeysBytes.length);
+ for (int i = 0; i < headerKeysBytes.length; i++) {
+ buffer.putInt(headerKeysBytes[i].length);
+ buffer.put(headerKeysBytes[i]);
+
+ if (headerValuesBytes[i] != null) {
+ buffer.putInt(headerValuesBytes[i].length);
+ buffer.put(headerValuesBytes[i]);
+ } else {
+ buffer.putInt(-1);
}
}
@@ -172,8 +161,8 @@ public class ProcessorRecordContext implements
RecordContext, RecordMetadata {
final int partition = buffer.getInt();
final int headerCount = buffer.getInt();
final Headers headers;
- if (headerCount == -1) {
- headers = null;
+ if (headerCount == -1) { // keep for backward compatibilty
+ headers = new RecordHeaders();
} else {
final Header[] headerArr = new Header[headerCount];
for (int i = 0; i < headerCount; i++) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
index 0f1a1ac..f4233c7 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import java.util.Objects;
@@ -31,7 +32,7 @@ class LRUCacheEntry {
LRUCacheEntry(final byte[] value) {
- this(value, null, false, -1, -1, -1, "");
+ this(value, new RecordHeaders(), false, -1, -1, -1, "");
}
LRUCacheEntry(final byte[] value,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index b2d0977..94bad59 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.LogContext;
@@ -380,7 +381,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
public void shouldLogAndMeterWhenSkippingNullKeyWithBuiltInMetrics() {
setup(false);
context.setRecordContext(
- new ProcessorRecordContext(-1, -2, -3, "topic", null)
+ new ProcessorRecordContext(-1, -2, -3, "topic", new
RecordHeaders())
);
try (final LogCaptureAppender appender =
@@ -413,22 +414,22 @@ public class KStreamSessionWindowAggregateProcessorTest {
processor.init(context);
// dummy record to establish stream time = 0
- context.setRecordContext(new ProcessorRecordContext(0, -2, -3,
"topic", null));
+ context.setRecordContext(new ProcessorRecordContext(0, -2, -3,
"topic", new RecordHeaders()));
processor.process("dummy", "dummy");
// record arrives on time, should not be skipped
- context.setRecordContext(new ProcessorRecordContext(0, -2, -3,
"topic", null));
+ context.setRecordContext(new ProcessorRecordContext(0, -2, -3,
"topic", new RecordHeaders()));
processor.process("OnTime1", "1");
// dummy record to advance stream time = 1
- context.setRecordContext(new ProcessorRecordContext(1, -2, -3,
"topic", null));
+ context.setRecordContext(new ProcessorRecordContext(1, -2, -3,
"topic", new RecordHeaders()));
processor.process("dummy", "dummy");
try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(KStreamSessionWindowAggregate.class)) {
// record is late
- context.setRecordContext(new ProcessorRecordContext(0, -2, -3,
"topic", null));
+ context.setRecordContext(new ProcessorRecordContext(0, -2, -3,
"topic", new RecordHeaders()));
processor.process("Late1", "1");
assertThat(
@@ -481,27 +482,27 @@ public class KStreamSessionWindowAggregateProcessorTest {
LogCaptureAppender.createAndRegister(KStreamSessionWindowAggregate.class)) {
// dummy record to establish stream time = 0
- context.setRecordContext(new ProcessorRecordContext(0, -2, -3,
"topic", null));
+ context.setRecordContext(new ProcessorRecordContext(0, -2, -3,
"topic", new RecordHeaders()));
processor.process("dummy", "dummy");
// record arrives on time, should not be skipped
- context.setRecordContext(new ProcessorRecordContext(0, -2, -3,
"topic", null));
+ context.setRecordContext(new ProcessorRecordContext(0, -2, -3,
"topic", new RecordHeaders()));
processor.process("OnTime1", "1");
// dummy record to advance stream time = 1
- context.setRecordContext(new ProcessorRecordContext(1, -2, -3,
"topic", null));
+ context.setRecordContext(new ProcessorRecordContext(1, -2, -3,
"topic", new RecordHeaders()));
processor.process("dummy", "dummy");
// delayed record arrives on time, should not be skipped
- context.setRecordContext(new ProcessorRecordContext(0, -2, -3,
"topic", null));
+ context.setRecordContext(new ProcessorRecordContext(0, -2, -3,
"topic", new RecordHeaders()));
processor.process("OnTime2", "1");
// dummy record to advance stream time = 2
- context.setRecordContext(new ProcessorRecordContext(2, -2, -3,
"topic", null));
+ context.setRecordContext(new ProcessorRecordContext(2, -2, -3,
"topic", new RecordHeaders()));
processor.process("dummy", "dummy");
// delayed record arrives late
- context.setRecordContext(new ProcessorRecordContext(0, -2, -3,
"topic", null));
+ context.setRecordContext(new ProcessorRecordContext(0, -2, -3,
"topic", new RecordHeaders()));
processor.process("Late1", "1");
assertThat(
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
index c7de7f7..991ee59 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValueTimestamp;
@@ -257,7 +258,7 @@ public class KTableKTableInnerJoinTest {
).get();
final MockProcessorContext context = new MockProcessorContext(props);
- context.setRecordMetadata("left", -1, -2, null, -3);
+ context.setRecordMetadata("left", -1, -2, new RecordHeaders(), -3);
join.init(context);
try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(KTableKTableInnerJoin.class)) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
index 451725f..0d7f4b9 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
@@ -523,7 +524,7 @@ public class KTableKTableLeftJoinTest {
).get();
final MockProcessorContext context = new MockProcessorContext(props);
- context.setRecordMetadata("left", -1, -2, null, -3);
+ context.setRecordMetadata("left", -1, -2, new RecordHeaders(), -3);
join.init(context);
try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(KTableKTableLeftJoin.class)) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
index 40da184..e41d654 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
@@ -414,7 +415,7 @@ public class KTableKTableOuterJoinTest {
).get();
final MockProcessorContext context = new MockProcessorContext(props);
- context.setRecordMetadata("left", -1, -2, null, -3);
+ context.setRecordMetadata("left", -1, -2, new RecordHeaders(), -3);
join.init(context);
try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(KTableKTableOuterJoin.class)) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoinTest.java
index c5e211d..8e9ab43 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoinTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoinTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
@@ -48,7 +49,7 @@ public class KTableKTableRightJoinTest {
props.setProperty(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG,
StreamsConfig.METRICS_LATEST);
final MockProcessorContext context = new MockProcessorContext(props);
- context.setRecordMetadata("left", -1, -2, null, -3);
+ context.setRecordMetadata("left", -1, -2, new RecordHeaders(), -3);
join.init(context);
try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(KTableKTableRightJoin.class)) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
index 390f408..5ab1512 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
@@ -224,7 +224,7 @@ public class KTableTransformValuesTest {
final ProcessorRecordContext recordContext = new
ProcessorRecordContext(
42L,
23L,
- 1,
+ -1,
"foo",
new RecordHeaders()
);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
index 3789ad2..fa4eada 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals.suppress;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
@@ -52,7 +53,7 @@ import static org.hamcrest.core.Is.is;
public class KTableSuppressProcessorMetricsTest {
private static final long ARBITRARY_LONG = 5L;
private static final TaskId TASK_ID = new TaskId(0, 0);
- private Properties streamsConfig = StreamsTestUtils.getStreamsConfig();
+ private final Properties streamsConfig =
StreamsTestUtils.getStreamsConfig();
private final String threadId = Thread.currentThread().getName();
private final MetricName evictionTotalMetricLatest = new MetricName(
@@ -151,7 +152,7 @@ public class KTableSuppressProcessorMetricsTest {
processor.init(context);
final long timestamp = 100L;
- context.setRecordMetadata("", 0, 0L, null, timestamp);
+ context.setRecordMetadata("", 0, 0L, new RecordHeaders(), timestamp);
final String key = "longKey";
final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
processor.process(key, value);
@@ -174,7 +175,7 @@ public class KTableSuppressProcessorMetricsTest {
verifyMetric(metrics, bufferCountMaxMetric, is(1.0));
}
- context.setRecordMetadata("", 0, 1L, null, timestamp + 1);
+ context.setRecordMetadata("", 0, 1L, new RecordHeaders(), timestamp +
1);
processor.process("key", value);
{
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
index 778af9a..4019965 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.kstream.internals.suppress;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
@@ -101,7 +102,7 @@ public class KTableSuppressProcessorTest {
final MockInternalProcessorContext context = harness.context;
final long timestamp = ARBITRARY_LONG;
- context.setRecordMetadata("", 0, 0L, null, timestamp);
+ context.setRecordMetadata("", 0, 0L, new RecordHeaders(), timestamp);
final String key = "hey";
final Change<Long> value = ARBITRARY_CHANGE;
harness.processor.process(key, value);
@@ -119,7 +120,7 @@ public class KTableSuppressProcessorTest {
final MockInternalProcessorContext context = harness.context;
final long timestamp = ARBITRARY_LONG;
- context.setRecordMetadata("", 0, 0L, null, timestamp);
+ context.setRecordMetadata("", 0, 0L, new RecordHeaders(), timestamp);
final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0L,
100L));
final Change<Long> value = ARBITRARY_CHANGE;
harness.processor.process(key, value);
@@ -137,13 +138,13 @@ public class KTableSuppressProcessorTest {
final MockInternalProcessorContext context = harness.context;
final long timestamp = 0L;
- context.setRecordMetadata("topic", 0, 0, null, timestamp);
+ context.setRecordMetadata("topic", 0, 0, new RecordHeaders(),
timestamp);
final String key = "hey";
final Change<Long> value = new Change<>(null, 1L);
harness.processor.process(key, value);
assertThat(context.forwarded(), hasSize(0));
- context.setRecordMetadata("topic", 0, 1, null, 1L);
+ context.setRecordMetadata("topic", 0, 1, new RecordHeaders(), 1L);
harness.processor.process("tick", new Change<>(null, null));
assertThat(context.forwarded(), hasSize(1));
@@ -161,7 +162,7 @@ public class KTableSuppressProcessorTest {
final long windowStart = 99L;
final long recordTime = 99L;
final long windowEnd = 100L;
- context.setRecordMetadata("topic", 0, 0, null, recordTime);
+ context.setRecordMetadata("topic", 0, 0, new RecordHeaders(),
recordTime);
final Windowed<String> key = new Windowed<>("hey", new
TimeWindow(windowStart, windowEnd));
final Change<Long> value = ARBITRARY_CHANGE;
harness.processor.process(key, value);
@@ -172,7 +173,7 @@ public class KTableSuppressProcessorTest {
final long windowStart2 = 100L;
final long recordTime2 = 100L;
final long windowEnd2 = 101L;
- context.setRecordMetadata("topic", 0, 1, null, recordTime2);
+ context.setRecordMetadata("topic", 0, 1, new RecordHeaders(),
recordTime2);
harness.processor.process(new Windowed<>("dummyKey1", new
TimeWindow(windowStart2, windowEnd2)), ARBITRARY_CHANGE);
assertThat(context.forwarded(), hasSize(0));
@@ -180,7 +181,7 @@ public class KTableSuppressProcessorTest {
final long windowStart3 = 101L;
final long recordTime3 = 101L;
final long windowEnd3 = 102L;
- context.setRecordMetadata("topic", 0, 1, null, recordTime3);
+ context.setRecordMetadata("topic", 0, 1, new RecordHeaders(),
recordTime3);
harness.processor.process(new Windowed<>("dummyKey2", new
TimeWindow(windowStart3, windowEnd3)), ARBITRARY_CHANGE);
assertThat(context.forwarded(), hasSize(1));
@@ -204,13 +205,13 @@ public class KTableSuppressProcessorTest {
// even though the grace period is 0.
final long timestamp = 5L;
final long windowEnd = 100L;
- context.setRecordMetadata("", 0, 0L, null, timestamp);
+ context.setRecordMetadata("", 0, 0L, new RecordHeaders(), timestamp);
final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0,
windowEnd));
final Change<Long> value = ARBITRARY_CHANGE;
harness.processor.process(key, value);
assertThat(context.forwarded(), hasSize(0));
- context.setRecordMetadata("", 0, 1L, null, windowEnd);
+ context.setRecordMetadata("", 0, 1L, new RecordHeaders(), windowEnd);
harness.processor.process(new Windowed<>("dummyKey", new
TimeWindow(windowEnd, windowEnd + 100L)), ARBITRARY_CHANGE);
assertThat(context.forwarded(), hasSize(1));
@@ -226,7 +227,7 @@ public class KTableSuppressProcessorTest {
final MockInternalProcessorContext context = harness.context;
final long timestamp = 100L;
- context.setRecordMetadata("", 0, 0L, null, timestamp);
+ context.setRecordMetadata("", 0, 0L, new RecordHeaders(), timestamp);
final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0,
100L));
final Change<Long> value = ARBITRARY_CHANGE;
harness.processor.process(key, value);
@@ -248,7 +249,7 @@ public class KTableSuppressProcessorTest {
final MockInternalProcessorContext context = harness.context;
final long timestamp = 100L;
- context.setRecordMetadata("", 0, 0L, null, timestamp);
+ context.setRecordMetadata("", 0, 0L, new RecordHeaders(), timestamp);
final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0,
100L));
final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
harness.processor.process(key, value);
@@ -268,7 +269,7 @@ public class KTableSuppressProcessorTest {
final MockInternalProcessorContext context = harness.context;
final long timestamp = 100L;
- context.setRecordMetadata("", 0, 0L, null, timestamp);
+ context.setRecordMetadata("", 0, 0L, new RecordHeaders(), timestamp);
final Windowed<String> key = new Windowed<>("hey", new
SessionWindow(0L, 0L));
final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
harness.processor.process(key, value);
@@ -287,7 +288,7 @@ public class KTableSuppressProcessorTest {
final MockInternalProcessorContext context = harness.context;
final long timestamp = 100L;
- context.setRecordMetadata("", 0, 0L, null, timestamp);
+ context.setRecordMetadata("", 0, 0L, new RecordHeaders(), timestamp);
final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0L,
100L));
final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
harness.processor.process(key, value);
@@ -310,7 +311,7 @@ public class KTableSuppressProcessorTest {
final MockInternalProcessorContext context = harness.context;
final long timestamp = 100L;
- context.setRecordMetadata("", 0, 0L, null, timestamp);
+ context.setRecordMetadata("", 0, 0L, new RecordHeaders(), timestamp);
final Windowed<String> key = new Windowed<>("hey", new
SessionWindow(0L, 0L));
final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
harness.processor.process(key, value);
@@ -333,7 +334,7 @@ public class KTableSuppressProcessorTest {
final MockInternalProcessorContext context = harness.context;
final long timestamp = 100L;
- context.setRecordMetadata("", 0, 0L, null, timestamp);
+ context.setRecordMetadata("", 0, 0L, new RecordHeaders(), timestamp);
final String key = "hey";
final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
harness.processor.process(key, value);
@@ -351,12 +352,12 @@ public class KTableSuppressProcessorTest {
final MockInternalProcessorContext context = harness.context;
final long timestamp = 100L;
- context.setRecordMetadata("", 0, 0L, null, timestamp);
+ context.setRecordMetadata("", 0, 0L, new RecordHeaders(), timestamp);
final String key = "hey";
final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
harness.processor.process(key, value);
- context.setRecordMetadata("", 0, 1L, null, timestamp + 1);
+ context.setRecordMetadata("", 0, 1L, new RecordHeaders(), timestamp +
1);
harness.processor.process("dummyKey", value);
assertThat(context.forwarded(), hasSize(1));
@@ -372,12 +373,12 @@ public class KTableSuppressProcessorTest {
final MockInternalProcessorContext context = harness.context;
final long timestamp = 100L;
- context.setRecordMetadata("", 0, 0L, null, timestamp);
+ context.setRecordMetadata("", 0, 0L, new RecordHeaders(), timestamp);
final String key = "hey";
final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
harness.processor.process(key, value);
- context.setRecordMetadata("", 0, 1L, null, timestamp + 1);
+ context.setRecordMetadata("", 0, 1L, new RecordHeaders(), timestamp +
1);
harness.processor.process("dummyKey", value);
assertThat(context.forwarded(), hasSize(1));
@@ -393,13 +394,13 @@ public class KTableSuppressProcessorTest {
final MockInternalProcessorContext context = harness.context;
final long timestamp = 100L;
- context.setRecordMetadata("", 0, 0L, null, timestamp);
+ context.setRecordMetadata("", 0, 0L, new RecordHeaders(), timestamp);
context.setCurrentNode(new ProcessorNode("testNode"));
final String key = "hey";
final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
harness.processor.process(key, value);
- context.setRecordMetadata("", 0, 1L, null, timestamp);
+ context.setRecordMetadata("", 0, 1L, new RecordHeaders(), timestamp);
try {
harness.processor.process("dummyKey", value);
fail("expected an exception");
@@ -415,13 +416,13 @@ public class KTableSuppressProcessorTest {
final MockInternalProcessorContext context = harness.context;
final long timestamp = 100L;
- context.setRecordMetadata("", 0, 0L, null, timestamp);
+ context.setRecordMetadata("", 0, 0L, new RecordHeaders(), timestamp);
context.setCurrentNode(new ProcessorNode("testNode"));
final String key = "hey";
final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
harness.processor.process(key, value);
- context.setRecordMetadata("", 0, 1L, null, timestamp);
+ context.setRecordMetadata("", 0, 1L, new RecordHeaders(), timestamp);
try {
harness.processor.process("dummyKey", value);
fail("expected an exception");
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
index e4968e6..2e7c04c 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
@@ -92,7 +92,7 @@ public class AbstractProcessorContextTest {
@Test
public void
shouldNotThrowNullPointerExceptionOnTopicIfRecordContextTopicIsNull() {
- context.setRecordContext(new ProcessorRecordContext(0, 0, 0, null,
null));
+ context.setRecordContext(new ProcessorRecordContext(0, 0, 0, null, new
RecordHeaders()));
assertThat(context.topic(), nullValue());
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContextTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContextTest.java
index 83ab127..68eb21e 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContextTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContextTest.java
@@ -21,20 +21,35 @@ import
org.apache.kafka.common.header.internals.RecordHeaders;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
public class ProcessorRecordContextTest {
// timestamp + offset + partition: 8 + 8 + 4
private final static long MIN_SIZE = 20L;
@Test
- public void shouldEstimateNullTopicAndNullHeadersAsZeroLength() {
+ public void shouldNotAllowNullHeaders() {
+ assertThrows(
+ NullPointerException.class,
+ () -> new ProcessorRecordContext(
+ 42L,
+ 73L,
+ 0,
+ "topic",
+ null
+ )
+ );
+ }
+
+ @Test
+ public void shouldEstimateNullTopicAndEmptyHeadersAsZeroLength() {
final Headers headers = new RecordHeaders();
final ProcessorRecordContext context = new ProcessorRecordContext(
42L,
73L,
0,
null,
- null
+ new RecordHeaders()
);
assertEquals(MIN_SIZE, context.residentMemorySizeEstimate());
@@ -60,7 +75,7 @@ public class ProcessorRecordContextTest {
73L,
0,
"topic",
- null
+ new RecordHeaders()
);
assertEquals(MIN_SIZE + 5L, context.residentMemorySizeEstimate());
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
index 25fbc2f..2452acf 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
@@ -1218,6 +1219,6 @@ public abstract class AbstractWindowBytesStoreTest {
}
private ProcessorRecordContext createRecordContext(final long time) {
- return new ProcessorRecordContext(time, 0, 0, "topic", null);
+ return new ProcessorRecordContext(time, 0, 0, "topic", new
RecordHeaders());
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/BufferValueTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/BufferValueTest.java
index ad9b5f8..a8cc5ac 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/BufferValueTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/BufferValueTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.junit.Test;
@@ -81,7 +82,7 @@ public class BufferValueTest {
@Test
public void shouldAccountForDeduplicationInSizeEstimate() {
- final ProcessorRecordContext context = new ProcessorRecordContext(0L,
0L, 0, "topic", null);
+ final ProcessorRecordContext context = new ProcessorRecordContext(0L,
0L, 0, "topic", new RecordHeaders());
assertEquals(25L, new BufferValue(null, null, null,
context).residentMemorySizeEstimate());
assertEquals(26L, new BufferValue(new byte[] {(byte) 0}, null, null,
context).residentMemorySizeEstimate());
assertEquals(26L, new BufferValue(null, new byte[] {(byte) 0}, null,
context).residentMemorySizeEstimate());
@@ -94,7 +95,7 @@ public class BufferValueTest {
@Test
public void shouldSerializeNulls() {
- final ProcessorRecordContext context = new ProcessorRecordContext(0L,
0L, 0, "topic", null);
+ final ProcessorRecordContext context = new ProcessorRecordContext(0L,
0L, 0, "topic", new RecordHeaders());
final byte[] serializedContext = context.serialize();
final byte[] bytes = new BufferValue(null, null, null,
context).serialize(0).array();
final byte[] withoutContext = Arrays.copyOfRange(bytes,
serializedContext.length, bytes.length);
@@ -104,7 +105,7 @@ public class BufferValueTest {
@Test
public void shouldSerializePrior() {
- final ProcessorRecordContext context = new ProcessorRecordContext(0L,
0L, 0, "topic", null);
+ final ProcessorRecordContext context = new ProcessorRecordContext(0L,
0L, 0, "topic", new RecordHeaders());
final byte[] serializedContext = context.serialize();
final byte[] priorValue = {(byte) 5};
final byte[] bytes = new BufferValue(priorValue, null, null,
context).serialize(0).array();
@@ -115,7 +116,7 @@ public class BufferValueTest {
@Test
public void shouldSerializeOld() {
- final ProcessorRecordContext context = new ProcessorRecordContext(0L,
0L, 0, "topic", null);
+ final ProcessorRecordContext context = new ProcessorRecordContext(0L,
0L, 0, "topic", new RecordHeaders());
final byte[] serializedContext = context.serialize();
final byte[] oldValue = {(byte) 5};
final byte[] bytes = new BufferValue(null, oldValue, null,
context).serialize(0).array();
@@ -126,7 +127,7 @@ public class BufferValueTest {
@Test
public void shouldSerializeNew() {
- final ProcessorRecordContext context = new ProcessorRecordContext(0L,
0L, 0, "topic", null);
+ final ProcessorRecordContext context = new ProcessorRecordContext(0L,
0L, 0, "topic", new RecordHeaders());
final byte[] serializedContext = context.serialize();
final byte[] newValue = {(byte) 5};
final byte[] bytes = new BufferValue(null, null, newValue,
context).serialize(0).array();
@@ -137,7 +138,7 @@ public class BufferValueTest {
@Test
public void shouldCompactDuplicates() {
- final ProcessorRecordContext context = new ProcessorRecordContext(0L,
0L, 0, "topic", null);
+ final ProcessorRecordContext context = new ProcessorRecordContext(0L,
0L, 0, "topic", new RecordHeaders());
final byte[] serializedContext = context.serialize();
final byte[] duplicate = {(byte) 5};
final byte[] bytes = new BufferValue(duplicate, duplicate, null,
context).serialize(0).array();
@@ -148,7 +149,7 @@ public class BufferValueTest {
@Test
public void shouldDeserializePrior() {
- final ProcessorRecordContext context = new ProcessorRecordContext(0L,
0L, 0, "topic", null);
+ final ProcessorRecordContext context = new ProcessorRecordContext(0L,
0L, 0, "topic", new RecordHeaders());
final byte[] serializedContext = context.serialize();
final byte[] priorValue = {(byte) 5};
final ByteBuffer serialValue =
@@ -163,7 +164,7 @@ public class BufferValueTest {
@Test
public void shouldDeserializeOld() {
- final ProcessorRecordContext context = new ProcessorRecordContext(0L,
0L, 0, "topic", null);
+ final ProcessorRecordContext context = new ProcessorRecordContext(0L,
0L, 0, "topic", new RecordHeaders());
final byte[] serializedContext = context.serialize();
final byte[] oldValue = {(byte) 5};
final ByteBuffer serialValue =
@@ -177,7 +178,7 @@ public class BufferValueTest {
@Test
public void shouldDeserializeNew() {
- final ProcessorRecordContext context = new ProcessorRecordContext(0L,
0L, 0, "topic", null);
+ final ProcessorRecordContext context = new ProcessorRecordContext(0L,
0L, 0, "topic", new RecordHeaders());
final byte[] serializedContext = context.serialize();
final byte[] newValue = {(byte) 5};
final ByteBuffer serialValue =
@@ -191,7 +192,7 @@ public class BufferValueTest {
@Test
public void shouldDeserializeCompactedDuplicates() {
- final ProcessorRecordContext context = new ProcessorRecordContext(0L,
0L, 0, "topic", null);
+ final ProcessorRecordContext context = new ProcessorRecordContext(0L,
0L, 0, "topic", new RecordHeaders());
final byte[] serializedContext = context.serialize();
final byte[] duplicate = {(byte) 5};
final ByteBuffer serialValue =
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java
index ff78642..e702151 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.StringDeserializer;
@@ -75,7 +76,7 @@ public class CachingInMemoryKeyValueStoreTest extends
AbstractKeyValueStoreTest
store.setFlushListener(cacheFlushListener, false);
cache = new ThreadCache(new LogContext("testCache "),
maxCacheSizeBytes, new MockStreamsMetrics(new Metrics()));
context = new InternalMockProcessorContext<>(null, null, null, null,
cache);
- context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC,
null));
+ context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC,
new RecordHeaders()));
store.init((StateStoreContext) context, null);
}
@@ -201,7 +202,7 @@ public class CachingInMemoryKeyValueStoreTest extends
AbstractKeyValueStoreTest
store = new CachingKeyValueStore(underlyingStore);
cache = EasyMock.niceMock(ThreadCache.class);
context = new
InternalMockProcessorContext<>(TestUtils.tempDirectory(), null, null, null,
cache);
- context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC,
null));
+ context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC,
new RecordHeaders()));
store.init((StateStoreContext) context, store);
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java
index 417b35f..67258e7 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;
@@ -89,7 +90,7 @@ public class CachingInMemorySessionStoreTest {
cachingStore = new CachingSessionStore(underlyingStore,
SEGMENT_INTERVAL);
cache = new ThreadCache(new LogContext("testCache "),
MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
context = new
InternalMockProcessorContext<>(TestUtils.tempDirectory(), null, null, null,
cache);
- context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP,
0, 0, TOPIC, null));
+ context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP,
0, 0, TOPIC, new RecordHeaders()));
cachingStore.init((StateStoreContext) context, cachingStore);
}
@@ -225,7 +226,7 @@ public class CachingInMemorySessionStoreTest {
cachingStore = new CachingSessionStore(underlyingStore,
SEGMENT_INTERVAL);
cache = EasyMock.niceMock(ThreadCache.class);
final InternalMockProcessorContext context = new
InternalMockProcessorContext<>(TestUtils.tempDirectory(), null, null, null,
cache);
- context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC,
null));
+ context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC,
new RecordHeaders()));
cachingStore.init((StateStoreContext) context, cachingStore);
}
@@ -312,7 +313,7 @@ public class CachingInMemorySessionStoreTest {
final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
cachingStore.findSessions(
Bytes.wrap("a".getBytes(StandardCharsets.UTF_8)),
0,
- added.size() * 10);
+ added.size() * 10L);
final List<KeyValue<Windowed<Bytes>, byte[]>> actual =
toList(iterator);
verifyKeyValueList(added, actual);
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java
index 55018bf..8840da4 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;
@@ -94,7 +95,7 @@ public class CachingPersistentSessionStoreTest {
cache = new ThreadCache(new LogContext("testCache "),
MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
final InternalMockProcessorContext context =
new InternalMockProcessorContext<>(TestUtils.tempDirectory(),
null, null, null, cache);
- context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP,
0, 0, TOPIC, null));
+ context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP,
0, 0, TOPIC, new RecordHeaders()));
cachingStore.init((StateStoreContext) context, cachingStore);
}
@@ -211,7 +212,7 @@ public class CachingPersistentSessionStoreTest {
cache = EasyMock.niceMock(ThreadCache.class);
final InternalMockProcessorContext context =
new InternalMockProcessorContext<>(TestUtils.tempDirectory(),
null, null, null, cache);
- context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC,
null));
+ context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC,
new RecordHeaders()));
cachingStore.init((StateStoreContext) context, cachingStore);
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java
index ef9345b..8bdf8b7 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
@@ -104,7 +105,7 @@ public class CachingPersistentWindowStoreTest {
cachingStore.setFlushListener(cacheListener, false);
cache = new ThreadCache(new LogContext("testCache "),
MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
context = new
InternalMockProcessorContext<>(TestUtils.tempDirectory(), null, null, null,
cache);
- context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP,
0, 0, TOPIC, null));
+ context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP,
0, 0, TOPIC, new RecordHeaders()));
cachingStore.init((StateStoreContext) context, cachingStore);
}
@@ -907,7 +908,7 @@ public class CachingPersistentWindowStoreTest {
cachingStore = new CachingWindowStore(underlyingStore, WINDOW_SIZE,
SEGMENT_INTERVAL);
cache = EasyMock.createNiceMock(ThreadCache.class);
context = new
InternalMockProcessorContext<>(TestUtils.tempDirectory(), null, null, null,
cache);
- context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC,
null));
+ context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC,
new RecordHeaders()));
cachingStore.init((StateStoreContext) context, cachingStore);
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
index 081ce21..6d43b4c 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
@@ -43,16 +43,12 @@ public class NamedCacheTest {
private final Headers headers = new RecordHeaders(new Header[]{new
RecordHeader("key", "value".getBytes())});
private NamedCache cache;
- private Metrics innerMetrics;
- private StreamsMetricsImpl metrics;
- private final String taskIDString = "0.0";
- private final String underlyingStoreName = "storeName";
@Before
public void setUp() {
- innerMetrics = new Metrics();
- metrics = new MockStreamsMetrics(innerMetrics);
- cache = new NamedCache(taskIDString + "-" + underlyingStoreName,
metrics);
+ final Metrics innerMetrics = new Metrics();
+ final StreamsMetricsImpl metrics = new
MockStreamsMetrics(innerMetrics);
+ cache = new NamedCache("dummy-name", metrics);
}
@Test
@@ -63,13 +59,14 @@ public class NamedCacheTest {
new KeyValue<>("K3", "V3"),
new KeyValue<>("K4", "V4"),
new KeyValue<>("K5", "V5"));
- for (int i = 0; i < toInsert.size(); i++) {
- final byte[] key = toInsert.get(i).key.getBytes();
- final byte[] value = toInsert.get(i).value.getBytes();
- cache.put(Bytes.wrap(key), new LRUCacheEntry(value, null, true, 1,
1, 1, ""));
+ for (final KeyValue<String, String> stringStringKeyValue : toInsert) {
+ final byte[] key = stringStringKeyValue.key.getBytes();
+ final byte[] value = stringStringKeyValue.value.getBytes();
+ cache.put(Bytes.wrap(key),
+ new LRUCacheEntry(value, new RecordHeaders(), true, 1, 1, 1,
""));
final LRUCacheEntry head = cache.first();
final LRUCacheEntry tail = cache.last();
- assertEquals(new String(head.value()), toInsert.get(i).value);
+ assertEquals(new String(head.value()), stringStringKeyValue.value);
assertEquals(new String(tail.value()), toInsert.get(0).value);
assertEquals(cache.flushes(), 0);
assertEquals(cache.hits(), 0);
@@ -158,12 +155,7 @@ public class NamedCacheTest {
cache.put(Bytes.wrap(new byte[]{1}), new LRUCacheEntry(new
byte[]{20}));
cache.put(Bytes.wrap(new byte[]{2}), new LRUCacheEntry(new byte[]{30},
headers, true, 0, 0, 0, ""));
- cache.setListener(new ThreadCache.DirtyEntryFlushListener() {
- @Override
- public void apply(final List<ThreadCache.DirtyEntry> dirty) {
- flushed.addAll(dirty);
- }
- });
+ cache.setListener(flushed::addAll);
cache.evict();
@@ -185,19 +177,14 @@ public class NamedCacheTest {
public void
shouldThrowIllegalStateExceptionWhenTryingToOverwriteDirtyEntryWithCleanEntry()
{
cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10},
headers, true, 0, 0, 0, ""));
assertThrows(IllegalStateException.class, () ->
cache.put(Bytes.wrap(new byte[]{0}),
- new LRUCacheEntry(new byte[]{10}, null, false, 0, 0, 0, "")));
+ new LRUCacheEntry(new byte[]{10}, new RecordHeaders(), false, 0,
0, 0, "")));
}
@Test
public void shouldRemoveDeletedValuesOnFlush() {
- cache.setListener(new ThreadCache.DirtyEntryFlushListener() {
- @Override
- public void apply(final List<ThreadCache.DirtyEntry> dirty) {
- // no-op
- }
- });
+ cache.setListener(dirty -> { /* no-op */ });
cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(null, headers,
true, 0, 0, 0, ""));
- cache.put(Bytes.wrap(new byte[]{1}), new LRUCacheEntry(new byte[]{20},
null, true, 0, 0, 0, ""));
+ cache.put(Bytes.wrap(new byte[]{1}), new LRUCacheEntry(new byte[]{20},
new RecordHeaders(), true, 0, 0, 0, ""));
cache.flush();
assertEquals(1, cache.size());
assertNotNull(cache.get(Bytes.wrap(new byte[]{1})));
@@ -205,21 +192,18 @@ public class NamedCacheTest {
@Test
public void shouldBeReentrantAndNotBreakLRU() {
- final LRUCacheEntry dirty = new LRUCacheEntry(new byte[]{3}, null,
true, 0, 0, 0, "");
+ final LRUCacheEntry dirty = new LRUCacheEntry(new byte[]{3}, new
RecordHeaders(), true, 0, 0, 0, "");
final LRUCacheEntry clean = new LRUCacheEntry(new byte[]{3});
cache.put(Bytes.wrap(new byte[]{0}), dirty);
cache.put(Bytes.wrap(new byte[]{1}), clean);
cache.put(Bytes.wrap(new byte[]{2}), clean);
assertEquals(3 * cache.head().size(), cache.sizeInBytes());
- cache.setListener(new ThreadCache.DirtyEntryFlushListener() {
- @Override
- public void apply(final List<ThreadCache.DirtyEntry> dirty) {
- cache.put(Bytes.wrap(new byte[]{3}), clean);
- // evict key 1
- cache.evict();
- // evict key 2
- cache.evict();
- }
+ cache.setListener(dirty1 -> {
+ cache.put(Bytes.wrap(new byte[]{3}), clean);
+ // evict key 1
+ cache.evict();
+ // evict key 2
+ cache.evict();
});
assertEquals(3 * cache.head().size(), cache.sizeInBytes());
@@ -251,15 +235,10 @@ public class NamedCacheTest {
@Test
public void
shouldNotThrowIllegalArgumentAfterEvictingDirtyRecordAndThenPuttingNewRecordWithSameKey()
{
- final LRUCacheEntry dirty = new LRUCacheEntry(new byte[]{3}, null,
true, 0, 0, 0, "");
+ final LRUCacheEntry dirty = new LRUCacheEntry(new byte[]{3}, new
RecordHeaders(), true, 0, 0, 0, "");
final LRUCacheEntry clean = new LRUCacheEntry(new byte[]{3});
final Bytes key = Bytes.wrap(new byte[] {3});
- cache.setListener(new ThreadCache.DirtyEntryFlushListener() {
- @Override
- public void apply(final List<ThreadCache.DirtyEntry> dirty) {
- cache.put(key, clean);
- }
- });
+ cache.setListener(dirty1 -> cache.put(key, clean));
cache.put(key, dirty);
cache.evict();
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
index c449de9..805d295 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
@@ -63,7 +64,7 @@ public class ThreadCacheTest {
for (final KeyValue<String, String> kvToInsert : toInsert) {
final Bytes key = Bytes.wrap(kvToInsert.key.getBytes());
final byte[] value = kvToInsert.value.getBytes();
- cache.put(namespace, key, new LRUCacheEntry(value, null, true, 1L,
1L, 1, ""));
+ cache.put(namespace, key, new LRUCacheEntry(value, new
RecordHeaders(), true, 1L, 1L, 1, ""));
}
for (final KeyValue<String, String> kvToInsert : toInsert) {
@@ -96,7 +97,7 @@ public class ThreadCacheTest {
final String keyStr = "K" + i;
final Bytes key = Bytes.wrap(keyStr.getBytes());
final byte[] value = new byte[valueSizeBytes];
- cache.put(namespace, key, new LRUCacheEntry(value, null, true, 1L,
1L, 1, ""));
+ cache.put(namespace, key, new LRUCacheEntry(value, new
RecordHeaders(), true, 1L, 1L, 1, ""));
}
@@ -134,7 +135,7 @@ public class ThreadCacheTest {
}
- static int memoryCacheEntrySize(final byte[] key, final byte[] value,
final String topic) {
+ static long memoryCacheEntrySize(final byte[] key, final byte[] value,
final String topic) {
return key.length +
value.length +
1 + // isDirty
@@ -174,7 +175,7 @@ public class ThreadCacheTest {
for (final KeyValue<String, String> kvToInsert : toInsert) {
final Bytes key = Bytes.wrap(kvToInsert.key.getBytes());
final byte[] value = kvToInsert.value.getBytes();
- cache.put(namespace, key, new LRUCacheEntry(value, null, true, 1,
1, 1, ""));
+ cache.put(namespace, key, new LRUCacheEntry(value, new
RecordHeaders(), true, 1, 1, 1, ""));
}
for (int i = 0; i < expected.size(); i++) {
@@ -363,8 +364,8 @@ public class ThreadCacheTest {
@Test
public void shouldSkipEntriesWhereValueHasBeenEvictedFromCache() {
- final int entrySize = memoryCacheEntrySize(new byte[1], new byte[1],
"");
- final ThreadCache cache = setupThreadCache(0, 5, entrySize * 5, false);
+ final long entrySize = memoryCacheEntrySize(new byte[1], new byte[1],
"");
+ final ThreadCache cache = setupThreadCache(0, 5, entrySize * 5L,
false);
assertEquals(5, cache.size());
// should evict byte[] {0}
cache.put(namespace, Bytes.wrap(new byte[]{6}), dirtyEntry(new
byte[]{6}));
@@ -374,8 +375,8 @@ public class ThreadCacheTest {
@Test
public void
shouldSkipEntriesWhereValueHasBeenEvictedFromCacheReverseRange() {
- final int entrySize = memoryCacheEntrySize(new byte[1], new byte[1],
"");
- final ThreadCache cache = setupThreadCache(4, 0, entrySize * 5, true);
+ final long entrySize = memoryCacheEntrySize(new byte[1], new byte[1],
"");
+ final ThreadCache cache = setupThreadCache(4, 0, entrySize * 5L, true);
assertEquals(5, cache.size());
// should evict byte[] {4}
cache.put(namespace, Bytes.wrap(new byte[]{6}), dirtyEntry(new
byte[]{6}));
@@ -415,8 +416,8 @@ public class ThreadCacheTest {
@Test
public void shouldReturnAllUnevictedValuesFromCache() {
- final int entrySize = memoryCacheEntrySize(new byte[1], new byte[1],
"");
- final ThreadCache cache = setupThreadCache(0, 5, entrySize * 5, false);
+ final long entrySize = memoryCacheEntrySize(new byte[1], new byte[1],
"");
+ final ThreadCache cache = setupThreadCache(0, 5, entrySize * 5L,
false);
assertEquals(5, cache.size());
// should evict byte[] {0}
cache.put(namespace, Bytes.wrap(new byte[]{6}), dirtyEntry(new
byte[]{6}));
@@ -426,8 +427,8 @@ public class ThreadCacheTest {
@Test
public void shouldReturnAllUnevictedValuesFromCacheInReverseOrder() {
- final int entrySize = memoryCacheEntrySize(new byte[1], new byte[1],
"");
- final ThreadCache cache = setupThreadCache(4, 0, entrySize * 5, true);
+ final long entrySize = memoryCacheEntrySize(new byte[1], new byte[1],
"");
+ final ThreadCache cache = setupThreadCache(4, 0, entrySize * 5L, true);
assertEquals(5, cache.size());
// should evict byte[] {4}
cache.put(namespace, Bytes.wrap(new byte[]{6}), dirtyEntry(new
byte[]{6}));
@@ -615,7 +616,7 @@ public class ThreadCacheTest {
}
private LRUCacheEntry dirtyEntry(final byte[] key) {
- return new LRUCacheEntry(key, null, true, -1, -1, -1, "");
+ return new LRUCacheEntry(key, new RecordHeaders(), true, -1, -1, -1,
"");
}
private LRUCacheEntry cleanEntry(final byte[] key) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
index 7d420c7..26e9488 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
@@ -370,7 +370,7 @@ public class TimeOrderedKeyValueBufferTest<B extends
TimeOrderedKeyValueBuffer<S
final RecordBatchingStateRestoreCallback stateRestoreCallback =
(RecordBatchingStateRestoreCallback)
context.stateRestoreCallback(testName);
- context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "",
null));
+ context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "", new
RecordHeaders()));
// These serialized formats were captured by running version 2.1 code.
// They verify that an upgrade from 2.1 will work.
@@ -489,7 +489,7 @@ public class TimeOrderedKeyValueBufferTest<B extends
TimeOrderedKeyValueBuffer<S
final RecordBatchingStateRestoreCallback stateRestoreCallback =
(RecordBatchingStateRestoreCallback)
context.stateRestoreCallback(testName);
- context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "",
null));
+ context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "", new
RecordHeaders()));
final RecordHeaders v1FlagHeaders = new RecordHeaders(new Header[]
{new RecordHeader("v", new byte[] {(byte) 1})});
@@ -611,7 +611,7 @@ public class TimeOrderedKeyValueBufferTest<B extends
TimeOrderedKeyValueBuffer<S
final RecordBatchingStateRestoreCallback stateRestoreCallback =
(RecordBatchingStateRestoreCallback)
context.stateRestoreCallback(testName);
- context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "",
null));
+ context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "", new
RecordHeaders()));
final RecordHeaders v2FlagHeaders = new RecordHeaders(new Header[]
{new RecordHeader("v", new byte[] {(byte) 2})});
@@ -735,7 +735,7 @@ public class TimeOrderedKeyValueBufferTest<B extends
TimeOrderedKeyValueBuffer<S
final RecordBatchingStateRestoreCallback stateRestoreCallback =
(RecordBatchingStateRestoreCallback)
context.stateRestoreCallback(testName);
- context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "",
null));
+ context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "", new
RecordHeaders()));
final RecordHeaders headers = new RecordHeaders(new Header[] {new
RecordHeader("v", new byte[] {(byte) 2})});
@@ -856,7 +856,7 @@ public class TimeOrderedKeyValueBufferTest<B extends
TimeOrderedKeyValueBuffer<S
final RecordBatchingStateRestoreCallback stateRestoreCallback =
(RecordBatchingStateRestoreCallback)
context.stateRestoreCallback(testName);
- context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "",
null));
+ context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "", new
RecordHeaders()));
final RecordHeaders headers = new RecordHeaders(new Header[] {new
RecordHeader("v", new byte[] {(byte) 3})});
@@ -977,7 +977,7 @@ public class TimeOrderedKeyValueBufferTest<B extends
TimeOrderedKeyValueBuffer<S
final RecordBatchingStateRestoreCallback stateRestoreCallback =
(RecordBatchingStateRestoreCallback)
context.stateRestoreCallback(testName);
- context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "",
null));
+ context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "", new
RecordHeaders()));
final RecordHeaders unknownFlagHeaders = new RecordHeaders(new
Header[] {new RecordHeader("v", new byte[] {(byte) -1})});
@@ -1025,7 +1025,7 @@ public class TimeOrderedKeyValueBufferTest<B extends
TimeOrderedKeyValueBuffer<S
}
private static ProcessorRecordContext getContext(final long
recordTimestamp) {
- return new ProcessorRecordContext(recordTimestamp, 0, 0, "topic",
null);
+ return new ProcessorRecordContext(recordTimestamp, 0, 0, "topic", new
RecordHeaders());
}
diff --git
a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
index b5ced0f..577f94f 100644
---
a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
+++
b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
@@ -423,6 +423,18 @@ public class MockProcessorContext implements
ProcessorContext, RecordCollector.S
return offset;
}
+ /**
+ * Returns the headers of the current input record; could be {@code null}
if it is not
+ * available.
+ *
+ * <p> Note, that headers should never be {@code null} in the actual Kafka
Streams runtime,
+ * even if they could be empty. However, this mock does not guarantee
non-{@code null} headers.
+ * Thus, you either need to add a {@code null} check to your production
code to use this mock
+ * for testing or you always need to set headers manually via {@link
#setHeaders(Headers)} to
+ * avoid a {@link NullPointerException} from your {@link Processor}
implementation.
+ *
+ * @return the headers
+ */
@Override
public Headers headers() {
return headers;
diff --git
a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
index ae345ea..6e3d931 100644
---
a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
+++
b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.MockProcessorContext;
@@ -253,7 +254,7 @@ public class MockProcessorContextTest {
}
context.resetForwards();
- context.setRecordMetadata("t1", 0, 0L, null, 0L);
+ context.setRecordMetadata("t1", 0, 0L, new RecordHeaders(), 0L);
{
processor.process("foo", 5L);