This is an automated email from the ASF dual-hosted git repository.

frankvicky pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5fc5d7591c5 KAFKA-20134: Implement TimestampedWindowStoreWithHeaders 
(N/N) (#21581)
5fc5d7591c5 is described below

commit 5fc5d7591c515f91dd8b1a92fec40aacb1a31f1a
Author: TengYao Chi <[email protected]>
AuthorDate: Fri Mar 6 18:09:42 2026 +0000

    KAFKA-20134: Implement TimestampedWindowStoreWithHeaders (N/N) (#21581)
    
    This PR implements the upgrade integration tests for
    `TimestampedWindowStateStoreWithHeaders` introduced in KIP-1271.
    
    The class should be reviewd:  `HeadersStoreUpgradeIntegrationTest`
    
    This should not be merged before #21497
    
    Reviewers: Matthias J. Sax <[email protected]>, Alieh Saeedi
    <[email protected]>
---
 .../HeadersStoreUpgradeIntegrationTest.java        | 472 +++++++++++++++++++++
 1 file changed, 472 insertions(+)

diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
index 9b6371f5a19..2976f0713fe 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
@@ -25,17 +25,23 @@ import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.ReadOnlyWindowStore;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.ValueTimestampHeaders;
 import org.apache.kafka.test.TestUtils;
@@ -50,14 +56,23 @@ import org.junit.jupiter.api.TestInfo;
 
 import java.io.IOException;
 import java.time.Duration;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Properties;
 
+import static java.util.Arrays.asList;
 import static java.util.Collections.singletonList;
 import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 
 @Tag("integration")
 public class HeadersStoreUpgradeIntegrationTest {
     private static final String STORE_NAME = "store";
+    private static final String WINDOW_STORE_NAME = "window-store";
+    private static final long WINDOW_SIZE_MS = 1000L;
+    private static final long RETENTION_MS = Duration.ofDays(1).toMillis();
+
     private String inputStream;
 
     private KafkaStreams kafkaStreams;
@@ -349,4 +364,461 @@ public class HeadersStoreUpgradeIntegrationTest {
             store.put(record.key(), ValueTimestampHeaders.make(record.value(), 
record.timestamp(), record.headers()));
         }
     }
+
+    @Test
+    public void 
shouldMigrateInMemoryTimestampedWindowStoreToTimestampedWindowStoreWithHeaders()
 throws Exception {
+        
shouldMigrateTimestampedWindowStoreToTimestampedWindowStoreWithHeaders(false);
+    }
+
+    @Test
+    public void 
shouldMigratePersistentTimestampedWindowStoreToTimestampedWindowStoreWithHeaders()
 throws Exception {
+        
shouldMigrateTimestampedWindowStoreToTimestampedWindowStoreWithHeaders(true);
+    }
+
+    /**
+     * Tests migration from TimestampedWindowStore to 
TimestampedWindowStoreWithHeaders.
+     * This is a true migration where both supplier and builder are upgraded.
+     */
+    private void 
shouldMigrateTimestampedWindowStoreToTimestampedWindowStoreWithHeaders(final 
boolean persistentStore) throws Exception {
+        // Phase 1: Run with old TimestampedWindowStore
+        final StreamsBuilder oldBuilder = new StreamsBuilder();
+        oldBuilder.addStateStore(
+                Stores.timestampedWindowStoreBuilder(
+                    persistentStore
+                        ? 
Stores.persistentTimestampedWindowStore(WINDOW_STORE_NAME, 
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false)
+                        : Stores.inMemoryWindowStore(WINDOW_STORE_NAME, 
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
+                    Serdes.String(),
+                    Serdes.String()))
+            .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .process(TimestampedWindowedProcessor::new, WINDOW_STORE_NAME);
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(oldBuilder.build(), props);
+        kafkaStreams.start();
+
+        final long baseTime = CLUSTER.time.milliseconds();
+        processWindowedKeyValueAndVerifyTimestamped("key1", "value1", baseTime 
+ 100);
+        processWindowedKeyValueAndVerifyTimestamped("key2", "value2", baseTime 
+ 200);
+        processWindowedKeyValueAndVerifyTimestamped("key3", "value3", baseTime 
+ 300);
+
+        kafkaStreams.close();
+        kafkaStreams = null;
+
+        final StreamsBuilder newBuilder = new StreamsBuilder();
+        newBuilder.addStateStore(
+                Stores.timestampedWindowStoreWithHeadersBuilder(
+                    persistentStore
+                        ? 
Stores.persistentTimestampedWindowStoreWithHeaders(WINDOW_STORE_NAME, 
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false)
+                        : Stores.inMemoryWindowStore(WINDOW_STORE_NAME, 
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
+                    Serdes.String(),
+                    Serdes.String()))
+            .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .process(TimestampedWindowedWithHeadersProcessor::new, 
WINDOW_STORE_NAME);
+
+        kafkaStreams = new KafkaStreams(newBuilder.build(), props);
+        kafkaStreams.start();
+
+        verifyWindowValueWithEmptyHeaders("key1", "value1", baseTime + 100);
+        verifyWindowValueWithEmptyHeaders("key2", "value2", baseTime + 200);
+        verifyWindowValueWithEmptyHeaders("key3", "value3", baseTime + 300);
+
+        final Headers headers = new RecordHeaders();
+        headers.add("source", "migration-test".getBytes());
+        headers.add("version", "1.0".getBytes());
+
+        processWindowedKeyValueWithHeadersAndVerify("key3", "value3-updated", 
baseTime + 350, headers, headers);
+        processWindowedKeyValueWithHeadersAndVerify("key4", "value4", baseTime 
+ 400, headers, headers);
+
+        kafkaStreams.close();
+    }
+
+    @Test
+    public void 
shouldProxyTimestampedWindowStoreToTimestampedWindowStoreWithHeaders() throws 
Exception {
+        final StreamsBuilder oldBuilder = new StreamsBuilder();
+        oldBuilder.addStateStore(
+                Stores.timestampedWindowStoreBuilder(
+                    Stores.persistentTimestampedWindowStore(WINDOW_STORE_NAME, 
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
+                    Serdes.String(),
+                    Serdes.String()))
+            .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .process(TimestampedWindowedProcessor::new, WINDOW_STORE_NAME);
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(oldBuilder.build(), props);
+        kafkaStreams.start();
+
+        final long baseTime = CLUSTER.time.milliseconds();
+        processWindowedKeyValueAndVerifyTimestamped("key1", "value1", baseTime 
+ 100);
+        processWindowedKeyValueAndVerifyTimestamped("key2", "value2", baseTime 
+ 200);
+        processWindowedKeyValueAndVerifyTimestamped("key3", "value3", baseTime 
+ 300);
+
+        kafkaStreams.close();
+        kafkaStreams = null;
+
+        // Restart with headers-aware builder but non-headers supplier 
(proxy/adapter mode)
+        final StreamsBuilder newBuilder = new StreamsBuilder();
+        newBuilder.addStateStore(
+                Stores.timestampedWindowStoreWithHeadersBuilder(
+                    Stores.persistentTimestampedWindowStore(WINDOW_STORE_NAME, 
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),  // 
non-headers supplier!
+                    Serdes.String(),
+                    Serdes.String()))
+            .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .process(TimestampedWindowedWithHeadersProcessor::new, 
WINDOW_STORE_NAME);
+
+        kafkaStreams = new KafkaStreams(newBuilder.build(), props);
+        kafkaStreams.start();
+
+        verifyWindowValueWithEmptyHeaders("key1", "value1", baseTime + 100);
+        verifyWindowValueWithEmptyHeaders("key2", "value2", baseTime + 200);
+        verifyWindowValueWithEmptyHeaders("key3", "value3", baseTime + 300);
+
+        final RecordHeaders headers = new RecordHeaders();
+        headers.add("source", "proxy-test".getBytes());
+
+        // In proxy mode, headers are stripped when writing to non-headers 
store
+        // So we expect empty headers when reading back
+        final Headers expectedHeaders = new RecordHeaders();
+
+        processWindowedKeyValueWithHeadersAndVerify("key3", "value3-updated", 
baseTime + 350, headers, expectedHeaders);
+        processWindowedKeyValueWithHeadersAndVerify("key4", "value4", baseTime 
+ 400, headers, expectedHeaders);
+
+        kafkaStreams.close();
+    }
+
+    private void processWindowedKeyValueAndVerifyTimestamped(final String key,
+                                                             final String 
value,
+                                                             final long 
timestamp) throws Exception {
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+            inputStream,
+            singletonList(KeyValue.pair(key, value)),
+            TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+                StringSerializer.class,
+                StringSerializer.class),
+            timestamp,
+            false);
+
+        TestUtils.waitForCondition(() -> {
+            try {
+                final ReadOnlyWindowStore<String, ValueAndTimestamp<String>> 
store =
+                    IntegrationTestUtils.getStore(WINDOW_STORE_NAME, 
kafkaStreams, QueryableStoreTypes.timestampedWindowStore());
+
+                if (store == null) {
+                    return false;
+                }
+
+                final long windowStart = timestamp - (timestamp % 
WINDOW_SIZE_MS);
+                final ValueAndTimestamp<String> result = store.fetch(key, 
windowStart);
+
+                return result != null
+                    && result.value().equals(value)
+                    && result.timestamp() == timestamp;
+            } catch (final Exception e) {
+                return false;
+            }
+        }, 60_000L, "Could not verify timestamped value in time.");
+    }
+
+    private void processWindowedKeyValueWithHeadersAndVerify(final String key,
+                                                              final String 
value,
+                                                              final long 
timestamp,
+                                                              final Headers 
headers,
+                                                              final Headers 
expectedHeaders) throws Exception {
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+            inputStream,
+            singletonList(KeyValue.pair(key, value)),
+            TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+                StringSerializer.class,
+                StringSerializer.class),
+            headers,
+            timestamp,
+            false);
+
+        TestUtils.waitForCondition(() -> {
+            try {
+                final ReadOnlyWindowStore<String, 
ValueTimestampHeaders<String>> store =
+                    IntegrationTestUtils.getStore(WINDOW_STORE_NAME, 
kafkaStreams, QueryableStoreTypes.windowStore());
+
+                if (store == null) {
+                    return false;
+                }
+
+                final long windowStart = timestamp - (timestamp % 
WINDOW_SIZE_MS);
+
+                final List<KeyValue<Windowed<String>, 
ValueTimestampHeaders<String>>> results = new LinkedList<>();
+                try (final KeyValueIterator<Windowed<String>, 
ValueTimestampHeaders<String>> iterator = store.all()) {
+                    while (iterator.hasNext()) {
+                        final KeyValue<Windowed<String>, 
ValueTimestampHeaders<String>> kv = iterator.next();
+                        if (kv.key.key().equals(key) && 
kv.key.window().start() == windowStart) {
+                            results.add(kv);
+                        }
+                    }
+                }
+
+                if (results.isEmpty()) {
+                    return false;
+                }
+
+                final ValueTimestampHeaders<String> result = 
results.get(0).value;
+                return result != null
+                    && result.value().equals(value)
+                    && result.timestamp() == timestamp
+                    && result.headers().equals(expectedHeaders);
+            } catch (final Exception e) {
+                e.printStackTrace();
+                return false;
+            }
+        }, 60_000L, "Could not verify windowed value with headers in time.");
+    }
+
+    private void verifyWindowValueWithEmptyHeaders(final String key,
+                                                    final String value,
+                                                    final long timestamp) 
throws Exception {
+        TestUtils.waitForCondition(() -> {
+            try {
+                final ReadOnlyWindowStore<String, 
ValueTimestampHeaders<String>> store =
+                    IntegrationTestUtils.getStore(WINDOW_STORE_NAME, 
kafkaStreams, QueryableStoreTypes.windowStore());
+
+                if (store == null) {
+                    return false;
+                }
+
+                final long windowStart = timestamp - (timestamp % 
WINDOW_SIZE_MS);
+
+                final List<KeyValue<Windowed<String>, 
ValueTimestampHeaders<String>>> results = new LinkedList<>();
+                try (final KeyValueIterator<Windowed<String>, 
ValueTimestampHeaders<String>> iterator = store.all()) {
+                    while (iterator.hasNext()) {
+                        final KeyValue<Windowed<String>, 
ValueTimestampHeaders<String>> kv = iterator.next();
+                        if (kv.key.key().equals(key) && 
kv.key.window().start() == windowStart) {
+                            results.add(kv);
+                        }
+                    }
+                }
+
+                if (results.isEmpty()) {
+                    return false;
+                }
+
+                final ValueTimestampHeaders<String> result = 
results.get(0).value;
+                assertNotNull(result, "Result should not be null");
+                assertEquals(value, result.value(), "Value should match");
+                assertEquals(timestamp, result.timestamp(), "Timestamp should 
match");
+
+                // Verify headers exist but are empty (migrated from 
timestamped store without headers)
+                assertNotNull(result.headers(), "Headers should not be null 
for migrated data");
+                assertEquals(0, result.headers().toArray().length, "Headers 
should be empty for migrated data");
+
+                return true;
+            } catch (final Exception e) {
+                e.printStackTrace();
+                return false;
+            }
+        }, 60_000L, "Could not verify legacy value with empty headers in 
time.");
+    }
+
+    /**
+     * Processor for TimestampedWindowStore (without headers).
+     */
+    private static class TimestampedWindowedProcessor implements 
Processor<String, String, Void, Void> {
+        private TimestampedWindowStore<String, String> store;
+
+        @Override
+        public void init(final ProcessorContext<Void, Void> context) {
+            store = context.getStateStore(WINDOW_STORE_NAME);
+        }
+
+        @Override
+        public void process(final Record<String, String> record) {
+            final long windowStart = record.timestamp() - (record.timestamp() 
% WINDOW_SIZE_MS);
+            store.put(record.key(), ValueAndTimestamp.make(record.value(), 
record.timestamp()), windowStart);
+        }
+    }
+
+    /**
+     * Processor for TimestampedWindowStoreWithHeaders (with headers).
+     */
+    private static class TimestampedWindowedWithHeadersProcessor implements 
Processor<String, String, Void, Void> {
+        private TimestampedWindowStoreWithHeaders<String, String> store;
+
+        @Override
+        public void init(final ProcessorContext<Void, Void> context) {
+            store = context.getStateStore(WINDOW_STORE_NAME);
+        }
+
+        @Override
+        public void process(final Record<String, String> record) {
+            final long windowStart = record.timestamp() - (record.timestamp() 
% WINDOW_SIZE_MS);
+            store.put(record.key(),
+                ValueTimestampHeaders.make(record.value(), record.timestamp(), 
record.headers()),
+                windowStart);
+        }
+    }
+
+    @Test
+    public void 
shouldFailDowngradeFromTimestampedWindowStoreWithHeadersToTimestampedWindowStore()
 throws Exception {
+        final Properties props = props();
+        setupAndPopulateWindowStoreWithHeaders(props, 
singletonList(KeyValue.pair("key1", 100L)));
+        kafkaStreams = null;
+
+        // Attempt to downgrade to non-headers window store
+        final StreamsBuilder downgradedBuilder = new StreamsBuilder();
+        downgradedBuilder.addStateStore(
+                Stores.timestampedWindowStoreBuilder(
+                    Stores.persistentTimestampedWindowStore(WINDOW_STORE_NAME,
+                        Duration.ofMillis(RETENTION_MS),
+                        Duration.ofMillis(WINDOW_SIZE_MS),
+                        false),
+                    Serdes.String(),
+                    Serdes.String()))
+            .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .process(TimestampedWindowedProcessor::new, WINDOW_STORE_NAME);
+
+        kafkaStreams = new KafkaStreams(downgradedBuilder.build(), props);
+
+        boolean exceptionThrown = false;
+        try {
+            kafkaStreams.start();
+        } catch (final Exception e) {
+            Throwable cause = e;
+            while (cause != null) {
+                if (cause instanceof ProcessorStateException &&
+                    cause.getMessage() != null &&
+                    cause.getMessage().contains("headers-aware") &&
+                    cause.getMessage().contains("Downgrade")) {
+                    exceptionThrown = true;
+                    break;
+                }
+                cause = cause.getCause();
+            }
+
+            if (!exceptionThrown) {
+                throw new AssertionError("Expected ProcessorStateException 
about downgrade not being supported, but got: " + e.getMessage(), e);
+            }
+        } finally {
+            kafkaStreams.close(Duration.ofSeconds(30L));
+        }
+
+        if (!exceptionThrown) {
+            throw new AssertionError("Expected ProcessorStateException to be 
thrown when attempting to downgrade from headers-aware to non-headers window 
store");
+        }
+    }
+
+    @Test
+    public void 
shouldSuccessfullyDowngradeFromTimestampedWindowStoreWithHeadersAfterCleanup() 
throws Exception {
+        final Properties props = props();
+        setupAndPopulateWindowStoreWithHeaders(props, 
asList(KeyValue.pair("key1", 100L), KeyValue.pair("key2", 200L)));
+
+        kafkaStreams.cleanUp(); // Delete local state
+        kafkaStreams = null;
+
+        final StreamsBuilder downgradedBuilder = new StreamsBuilder();
+        downgradedBuilder.addStateStore(
+                Stores.timestampedWindowStoreBuilder(
+                    Stores.persistentTimestampedWindowStore(WINDOW_STORE_NAME,
+                        Duration.ofMillis(RETENTION_MS),
+                        Duration.ofMillis(WINDOW_SIZE_MS),
+                        false),
+                    Serdes.String(),
+                    Serdes.String()))
+            .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .process(TimestampedWindowedProcessor::new, WINDOW_STORE_NAME);
+
+        kafkaStreams = new KafkaStreams(downgradedBuilder.build(), props);
+        kafkaStreams.start();
+
+        final long newTime = CLUSTER.time.milliseconds();
+        processWindowedKeyValueAndVerifyTimestamped("key3", "value3", newTime 
+ 300);
+        processWindowedKeyValueAndVerifyTimestamped("key4", "value4", newTime 
+ 400);
+
+        kafkaStreams.close();
+    }
+
+    private boolean windowStoreContainsKey(final String key, final long 
timestamp) {
+        try {
+            final ReadOnlyWindowStore<String, ValueTimestampHeaders<String>> 
store =
+                IntegrationTestUtils.getStore(WINDOW_STORE_NAME, kafkaStreams, 
QueryableStoreTypes.windowStore());
+
+            if (store == null) {
+                return false;
+            }
+
+            final long expectedWindowStart = timestamp - (timestamp % 
WINDOW_SIZE_MS);
+            try (final KeyValueIterator<Windowed<String>, 
ValueTimestampHeaders<String>> iterator = store.all()) {
+                while (iterator.hasNext()) {
+                    final KeyValue<Windowed<String>, 
ValueTimestampHeaders<String>> kv = iterator.next();
+                    if (kv.key.key().equals(key) && kv.key.window().start() == 
expectedWindowStart) {
+                        return true;
+                    }
+                }
+            }
+            return false;
+        } catch (final Exception e) {
+            return false;
+        }
+    }
+
+    /**
+     * Setup and populate a window store with headers.
+     * @param props Streams properties
+     * @param records List of (key, timestampOffset) tuples. Values will be 
generated as "value{N}"
+     * @return base time used for record timestamps
+     */
+    private long setupAndPopulateWindowStoreWithHeaders(final Properties props,
+                                                        final 
List<KeyValue<String, Long>> records) throws Exception {
+        final long baseTime = setupWindowStoreWithHeaders(props);
+
+        for (int i = 0; i < records.size(); i++) {
+            final KeyValue<String, Long> record = records.get(i);
+            final String value = "value" + (i + 1);
+            produceRecordWithHeaders(record.key, value, baseTime + 
record.value);
+        }
+
+        // Wait for all records to be processed
+        TestUtils.waitForCondition(
+            () -> {
+                for (final KeyValue<String, Long> record : records) {
+                    if (!windowStoreContainsKey(record.key, baseTime + 
record.value)) {
+                        return false;
+                    }
+                }
+                return true;
+            },
+            30_000L,
+            "Store was not populated with expected data"
+        );
+
+        kafkaStreams.close();
+        return baseTime;
+    }
+
+    private long setupWindowStoreWithHeaders(final Properties props) {
+        final StreamsBuilder headersBuilder = new StreamsBuilder();
+        headersBuilder.addStateStore(
+                Stores.timestampedWindowStoreWithHeadersBuilder(
+                    
Stores.persistentTimestampedWindowStoreWithHeaders(WINDOW_STORE_NAME,
+                        Duration.ofMillis(RETENTION_MS),
+                        Duration.ofMillis(WINDOW_SIZE_MS),
+                        false),
+                    Serdes.String(),
+                    Serdes.String()))
+            .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .process(TimestampedWindowedWithHeadersProcessor::new, 
WINDOW_STORE_NAME);
+
+        kafkaStreams = new KafkaStreams(headersBuilder.build(), props);
+        kafkaStreams.start();
+
+        return CLUSTER.time.milliseconds();
+    }
+
+    private void produceRecordWithHeaders(final String key, final String 
value, final long timestamp) throws Exception {
+        final Headers headers = new RecordHeaders();
+        headers.add("source", "test".getBytes());
+
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+            inputStream,
+            singletonList(KeyValue.pair(key, value)),
+            TestUtils.producerConfig(CLUSTER.bootstrapServers(), 
StringSerializer.class, StringSerializer.class),
+            headers,
+            timestamp,
+            false);
+    }
 }
\ No newline at end of file

Reply via email to