This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.1 by this push:
new d90c5ca KAFKA-7895: Revert suppress changelog bugfix for 2.1 (#7373)
d90c5ca is described below
commit d90c5ca32e3582cb13a08671f9382e050b22fabd
Author: John Roesler <[email protected]>
AuthorDate: Fri Sep 27 15:08:45 2019 -0500
KAFKA-7895: Revert suppress changelog bugfix for 2.1 (#7373)
Reviewers: Bill Bejeck <[email protected]>, Matthias J. Sax
<[email protected]>
---
.../InMemoryTimeOrderedKeyValueBuffer.java | 57 +++-----
.../SuppressionDurabilityIntegrationTest.java | 60 ---------
.../internals/TimeOrderedKeyValueBufferTest.java | 145 ++++-----------------
3 files changed, 46 insertions(+), 216 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
index 82d07a8..42abd70 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
@@ -17,9 +17,6 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.header.Header;
-import org.apache.kafka.common.header.internals.RecordHeader;
-import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.BytesSerializer;
import org.apache.kafka.common.serialization.Serde;
@@ -50,8 +47,6 @@ import static java.util.Objects.requireNonNull;
public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements
TimeOrderedKeyValueBuffer<K, V> {
private static final BytesSerializer KEY_SERIALIZER = new
BytesSerializer();
private static final ByteArraySerializer VALUE_SERIALIZER = new
ByteArraySerializer();
- private static final RecordHeaders V_1_CHANGELOG_HEADERS =
- new RecordHeaders(new Header[] {new RecordHeader("v", new byte[]
{(byte) 1})});
private final Map<Bytes, BufferKey> index = new HashMap<>();
private final TreeMap<BufferKey, ContextualRecord> sortedMap = new
TreeMap<>();
@@ -89,7 +84,7 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V>
implements TimeOrdere
* As of 2.1, there's no way for users to directly interact with the
buffer,
* so this method is implemented solely to be called by Streams (which
* it will do based on the {@code cache.max.bytes.buffering} config.
- *
+ * <p>
* It's currently a no-op.
*/
@Override
@@ -101,7 +96,7 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V>
implements TimeOrdere
* As of 2.1, there's no way for users to directly interact with the
buffer,
* so this method is implemented solely to be called by Streams (which
* it will do based on the {@code cache.max.bytes.buffering} config.
- *
+ * <p>
* It's currently a no-op.
*/
@Override
@@ -259,23 +254,23 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K,
V> implements TimeOrdere
}
private void logValue(final Bytes key, final BufferKey bufferKey, final
ContextualRecord value) {
- final byte[] serializedContextualRecord = value.serialize();
+ final byte[] innerValue = value.value();
final int sizeOfBufferTime = Long.BYTES;
- final int sizeOfContextualRecord = serializedContextualRecord.length;
+ final int sizeOfContextualRecord = innerValue.length;
final byte[] timeAndContextualRecord = ByteBuffer.wrap(new
byte[sizeOfBufferTime + sizeOfContextualRecord])
.putLong(bufferKey.time)
-
.put(serializedContextualRecord)
+ .put(innerValue)
.array();
collector.send(
changelogTopic,
key,
timeAndContextualRecord,
- V_1_CHANGELOG_HEADERS,
- partition,
null,
+ partition,
+ value.recordContext().timestamp(),
KEY_SERIALIZER,
VALUE_SERIALIZER
);
@@ -323,32 +318,20 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K,
V> implements TimeOrdere
final long time = timeAndValue.getLong();
final byte[] value = new byte[record.value().length - 8];
timeAndValue.get(value);
- if (record.headers().lastHeader("v") == null) {
- cleanPut(
- time,
- key,
- new ContextualRecord(
- value,
- new ProcessorRecordContext(
- record.timestamp(),
- record.offset(),
- record.partition(),
- record.topic(),
- record.headers()
- )
+ cleanPut(
+ time,
+ key,
+ new ContextualRecord(
+ value,
+ new ProcessorRecordContext(
+ record.timestamp(),
+ record.offset(),
+ record.partition(),
+ record.topic(),
+ null
)
- );
- } else if
(V_1_CHANGELOG_HEADERS.lastHeader("v").equals(record.headers().lastHeader("v")))
{
- final ContextualRecord contextualRecord =
ContextualRecord.deserialize(ByteBuffer.wrap(value));
-
- cleanPut(
- time,
- key,
- contextualRecord
- );
- } else {
- throw new IllegalArgumentException("Restoring apparently
invalid changelog record: " + record);
- }
+ )
+ );
}
if (record.partition() != partition) {
throw new IllegalStateException(
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
index fa49386..9b8982b 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
@@ -27,7 +27,6 @@ import
org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
@@ -38,9 +37,6 @@ import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
-import org.apache.kafka.streams.kstream.Transformer;
-import org.apache.kafka.streams.kstream.TransformerSupplier;
-import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.test.IntegrationTest;
import org.junit.ClassRule;
@@ -49,8 +45,6 @@ import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.HashSet;
@@ -60,7 +54,6 @@ import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
import static java.lang.Long.MAX_VALUE;
import static java.time.Duration.ofMillis;
@@ -77,7 +70,6 @@ import static
org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxRecord
import static org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
@RunWith(Parameterized.class)
@Category(IntegrationTest.class)
@@ -132,16 +124,11 @@ public class SuppressionDurabilityIntegrationTest {
final AtomicInteger eventCount = new AtomicInteger(0);
suppressedCounts.foreach((key, value) -> eventCount.incrementAndGet());
- // expect all post-suppress records to keep the right input topic
- final MetadataValidator metadataValidator = new
MetadataValidator(input);
-
suppressedCounts
- .transform(metadataValidator)
.to(outputSuppressed, Produced.with(STRING_SERDE, Serdes.Long()));
valueCounts
.toStream()
- .transform(metadataValidator)
.to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long()));
final Properties streamsConfig = mkProperties(mkMap(
@@ -237,59 +224,12 @@ public class SuppressionDurabilityIntegrationTest {
)
);
- metadataValidator.raiseExceptionIfAny();
-
} finally {
driver.close();
cleanStateAfterTest(CLUSTER, driver);
}
}
- private static final class MetadataValidator implements
TransformerSupplier<String, Long, KeyValue<String, Long>> {
- private static final Logger LOG =
LoggerFactory.getLogger(MetadataValidator.class);
- private final AtomicReference<Throwable> firstException = new
AtomicReference<>();
- private final String topic;
-
- MetadataValidator(final String topic) {
- this.topic = topic;
- }
-
- @Override
- public Transformer<String, Long, KeyValue<String, Long>> get() {
- return new Transformer<String, Long, KeyValue<String, Long>>() {
- private ProcessorContext context;
-
- @Override
- public void init(final ProcessorContext context) {
- this.context = context;
- }
-
- @Override
- public KeyValue<String, Long> transform(final String key,
final Long value) {
- try {
- assertThat(context.topic(), equalTo(topic));
- } catch (final Throwable e) {
- firstException.compareAndSet(null, e);
- LOG.error("Validation Failed", e);
- }
- return new KeyValue<>(key, value);
- }
-
- @Override
- public void close() {
-
- }
- };
- }
-
- void raiseExceptionIfAny() {
- final Throwable exception = firstException.get();
- if (exception != null) {
- throw new AssertionError("Got an exception during run",
exception);
- }
- }
- }
-
private static void verifyOutput(final String topic, final
List<KeyValueTimestamp<String, Long>> keyValueTimestamps) {
final Properties properties = mkProperties(
mkMap(
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
index 6ae36d4..8253728 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
@@ -53,6 +53,7 @@ import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.fail;
@RunWith(Parameterized.class)
@@ -258,8 +259,10 @@ public class TimeOrderedKeyValueBufferTest<B extends
TimeOrderedKeyValueBuffer<S
final TimeOrderedKeyValueBuffer<String, String> buffer =
bufferSupplier.apply(testName);
final MockInternalProcessorContext context = makeContext();
buffer.init(context, buffer);
- putRecord(buffer, context, 2L, 0L, "asdf", "2093j");
- putRecord(buffer, context, 1L, 1L, "zxcv", "3gon4i");
+ final String value1 = "2093j";
+ final String value2 = "3gon4i";
+ putRecord(buffer, context, 2L, 0L, "asdf", value1);
+ putRecord(buffer, context, 1L, 1L, "zxcv", value2);
putRecord(buffer, context, 0L, 2L, "deleteme", "deadbeef");
// replace "deleteme" with a tombstone
@@ -272,20 +275,21 @@ public class TimeOrderedKeyValueBufferTest<B extends
TimeOrderedKeyValueBuffer<S
// which we can't compare for equality using ProducerRecord.
// As a workaround, I'm deserializing them and shoving them in a
KeyValue, just for ease of testing.
- final List<ProducerRecord<String, KeyValue<Long, ContextualRecord>>>
collected =
+ final List<ProducerRecord<String, KeyValue<Long, String>>> collected =
((MockRecordCollector) context.recordCollector())
.collected()
.stream()
.map(pr -> {
- final KeyValue<Long, ContextualRecord> niceValue;
+ final KeyValue<Long, String> niceValue;
if (pr.value() == null) {
niceValue = null;
} else {
final byte[] timestampAndValue = pr.value();
final ByteBuffer wrap =
ByteBuffer.wrap(timestampAndValue);
final long timestamp = wrap.getLong();
- final ContextualRecord contextualRecord =
ContextualRecord.deserialize(wrap);
- niceValue = new KeyValue<>(timestamp,
contextualRecord);
+ final byte[] value = new byte[pr.value().length -
Long.BYTES];
+ wrap.get(value);
+ niceValue = new KeyValue<>(timestamp, new
String(value, UTF_8));
}
return new ProducerRecord<>(pr.topic(),
@@ -303,21 +307,21 @@ public class TimeOrderedKeyValueBufferTest<B extends
TimeOrderedKeyValueBuffer<S
null,
"deleteme",
null,
- new RecordHeaders()
+ null
),
new ProducerRecord<>(APP_ID + "-" + testName + "-changelog",
0,
- null,
+ 1L,
"zxcv",
- new KeyValue<>(1L, getRecord("3gon4i", 1)),
- V_1_CHANGELOG_HEADERS
+ new KeyValue<>(1L, value2),
+ null
),
new ProducerRecord<>(APP_ID + "-" + testName + "-changelog",
0,
- null,
+ 0L,
"asdf",
- new KeyValue<>(2L, getRecord("2093j", 0)),
- V_1_CHANGELOG_HEADERS
+ new KeyValue<>(2L, value1),
+ null
)
)));
@@ -406,118 +410,18 @@ public class TimeOrderedKeyValueBufferTest<B extends
TimeOrderedKeyValueBuffer<S
new Eviction<>(
"zxcv",
"3o4im",
- new ProcessorRecordContext(2L, 2, 0, "changelog-topic", new
RecordHeaders())),
+ new ProcessorRecordContext(2L, 2, 0, "changelog-topic", null)),
new Eviction<>(
"asdf",
"qwer",
- new ProcessorRecordContext(1L, 1, 0, "changelog-topic", new
RecordHeaders()))
+ new ProcessorRecordContext(1L, 1, 0, "changelog-topic", null))
)));
cleanup(context, buffer);
}
@Test
- public void shouldRestoreNewFormat() {
- final TimeOrderedKeyValueBuffer<String, String> buffer =
bufferSupplier.apply(testName);
- final MockInternalProcessorContext context = makeContext();
- buffer.init(context, buffer);
-
- final RecordBatchingStateRestoreCallback stateRestoreCallback =
- (RecordBatchingStateRestoreCallback)
context.stateRestoreCallback(testName);
-
- context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "",
null));
-
- final RecordHeaders v1FlagHeaders = new RecordHeaders(new Header[]
{new RecordHeader("v", new byte[] {(byte) 1})});
-
- final byte[] todeleteValue = getRecord("doomed", 0).serialize();
- final byte[] asdfValue = getRecord("qwer", 1).serialize();
- final byte[] zxcvValue = getRecord("3o4im", 2).serialize();
- stateRestoreCallback.restoreBatch(asList(
- new ConsumerRecord<>("changelog-topic",
- 0,
- 0,
- 999,
- TimestampType.CREATE_TIME,
- -1L,
- -1,
- -1,
- "todelete".getBytes(UTF_8),
- ByteBuffer.allocate(Long.BYTES +
todeleteValue.length).putLong(0L).put(todeleteValue).array(),
- v1FlagHeaders),
- new ConsumerRecord<>("changelog-topic",
- 0,
- 1,
- 9999,
- TimestampType.CREATE_TIME,
- -1L,
- -1,
- -1,
- "asdf".getBytes(UTF_8),
- ByteBuffer.allocate(Long.BYTES +
asdfValue.length).putLong(2L).put(asdfValue).array(),
- v1FlagHeaders),
- new ConsumerRecord<>("changelog-topic",
- 0,
- 2,
- 99,
- TimestampType.CREATE_TIME,
- -1L,
- -1,
- -1,
- "zxcv".getBytes(UTF_8),
- ByteBuffer.allocate(Long.BYTES +
zxcvValue.length).putLong(1L).put(zxcvValue).array(),
- v1FlagHeaders)
- ));
-
- assertThat(buffer.numRecords(), is(3));
- assertThat(buffer.minTimestamp(), is(0L));
- assertThat(buffer.bufferSize(), is(130L));
-
- stateRestoreCallback.restoreBatch(singletonList(
- new ConsumerRecord<>("changelog-topic",
- 0,
- 3,
- 3,
- TimestampType.CREATE_TIME,
- -1L,
- -1,
- -1,
- "todelete".getBytes(UTF_8),
- null)
- ));
-
- assertThat(buffer.numRecords(), is(2));
- assertThat(buffer.minTimestamp(), is(1L));
- assertThat(buffer.bufferSize(), is(83L));
-
- // flush the buffer into a list in buffer order so we can make
assertions about the contents.
-
- final List<Eviction<String, String>> evicted = new LinkedList<>();
- buffer.evictWhile(() -> true, evicted::add);
-
- // Several things to note:
- // * The buffered records are ordered according to their buffer time
(serialized in the value of the changelog)
- // * The record timestamps are properly restored, and not conflated
with the record's buffer time.
- // * The keys and values are properly restored
- // * The record topic is set to the original input topic, *not* the
changelog topic
- // * The record offset preserves the original input record's offset,
*not* the offset of the changelog record
-
-
- assertThat(evicted, is(asList(
- new Eviction<>(
- "zxcv",
- "3o4im",
- getContext(2L)),
- new Eviction<>(
- "asdf",
- "qwer",
- getContext(1L)
- ))));
-
- cleanup(context, buffer);
- }
-
- @Test
- public void shouldNotRestoreUnrecognizedVersionRecord() {
+ public void shouldIgnoreHeadersOnRestore() {
final TimeOrderedKeyValueBuffer<String, String> buffer =
bufferSupplier.apply(testName);
final MockInternalProcessorContext context = makeContext();
buffer.init(context, buffer);
@@ -544,9 +448,12 @@ public class TimeOrderedKeyValueBufferTest<B extends
TimeOrderedKeyValueBuffer<S
ByteBuffer.allocate(Long.BYTES +
todeleteValue.length).putLong(0L).put(todeleteValue).array(),
unknownFlagHeaders)
));
- fail("expected an exception");
- } catch (final IllegalArgumentException expected) {
- // nothing to do.
+
+ final List<Eviction<String, String>> evicted = new LinkedList<>();
+ buffer.evictWhile(() -> buffer.numRecords() > 0, evicted::add);
+ assertThat(evicted.size(), is(1));
+ final Eviction<String, String> eviction = evicted.get(0);
+ assertThat(eviction.recordContext().headers(), nullValue());
} finally {
cleanup(context, buffer);
}