This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.8 by this push:
new 2576e1d KAFKA-12508: Disable KIP-557 (#10397)
2576e1d is described below
commit 2576e1d91d7c32523f0e50472532713c7d96427f
Author: John Roesler <[email protected]>
AuthorDate: Thu Mar 25 14:42:26 2021 -0500
KAFKA-12508: Disable KIP-557 (#10397)
A major issue has been raised that this implementation of
emit-on-change is vulnerable to a number of data-loss bugs
in the presence of recovery with dirty state under at-least-once
semantics. This should be fixed in the future when we implement
a way to avoid or clean up the dirty state under at-least-once,
at which point it will be safe to re-introduce KIP-557 and
complete it.
Reviewers: A. Sophie Blee-Goldman <[email protected]>
---
.../streams/kstream/internals/KTableSource.java | 37 ++++------------------
.../KTableKTableForeignKeyJoinIntegrationTest.java | 18 ++---------
.../kstream/internals/KTableSourceTest.java | 2 ++
3 files changed, 11 insertions(+), 46 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
index f6756f5..b9f3580 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
@@ -21,20 +21,15 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
-import
org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore;
-import
org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore.RawAndDeserializedValue;
-import org.apache.kafka.streams.state.internals.WrappedStateStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Objects;
import static
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
-import static
org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics.skippedIdempotentUpdatesSensor;
public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
private static final Logger LOG =
LoggerFactory.getLogger(KTableSource.class);
@@ -79,11 +74,10 @@ public class KTableSource<K, V> implements
ProcessorSupplier<K, V> {
private class KTableSourceProcessor extends AbstractProcessor<K, V> {
- private MeteredTimestampedKeyValueStore<K, V> store;
+ private TimestampedKeyValueStore<K, V> store;
private TimestampedTupleForwarder<K, V> tupleForwarder;
private StreamsMetricsImpl metrics;
private Sensor droppedRecordsSensor;
- private Sensor skippedIdempotentUpdatesSensor = null;
@SuppressWarnings("unchecked")
@Override
@@ -92,24 +86,12 @@ public class KTableSource<K, V> implements
ProcessorSupplier<K, V> {
metrics = (StreamsMetricsImpl) context.metrics();
droppedRecordsSensor =
droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(),
context.taskId().toString(), metrics);
if (queryableName != null) {
- final StateStore stateStore =
context.getStateStore(queryableName);
- try {
- store =
((WrappedStateStore<MeteredTimestampedKeyValueStore<K, V>, K, V>)
stateStore).wrapped();
- } catch (final ClassCastException e) {
- throw new IllegalStateException("Unexpected store type: "
+ stateStore.getClass() + " for store: " + queryableName, e);
- }
+ store = (TimestampedKeyValueStore<K, V>)
context.getStateStore(queryableName);
tupleForwarder = new TimestampedTupleForwarder<>(
store,
context,
new TimestampedCacheFlushListener<>(context),
sendOldValues);
- skippedIdempotentUpdatesSensor =
skippedIdempotentUpdatesSensor(
- Thread.currentThread().getName(),
- context.taskId().toString(),
- ((InternalProcessorContext) context).currentNode().name(),
- metrics
- );
-
}
}
@@ -126,8 +108,7 @@ public class KTableSource<K, V> implements
ProcessorSupplier<K, V> {
}
if (queryableName != null) {
- final RawAndDeserializedValue<V> tuple =
store.getWithBinary(key);
- final ValueAndTimestamp<V> oldValueAndTimestamp = tuple.value;
+ final ValueAndTimestamp<V> oldValueAndTimestamp =
store.get(key);
final V oldValue;
if (oldValueAndTimestamp != null) {
oldValue = oldValueAndTimestamp.value();
@@ -138,14 +119,8 @@ public class KTableSource<K, V> implements
ProcessorSupplier<K, V> {
} else {
oldValue = null;
}
- final ValueAndTimestamp<V> newValueAndTimestamp =
ValueAndTimestamp.make(value, context().timestamp());
- final boolean isDifferentValue =
- store.putIfDifferentValues(key, newValueAndTimestamp,
tuple.serializedValue);
- if (isDifferentValue) {
- tupleForwarder.maybeForward(key, value, oldValue);
- } else {
- skippedIdempotentUpdatesSensor.record();
- }
+ store.put(key, ValueAndTimestamp.make(value,
context().timestamp()));
+ tupleForwarder.maybeForward(key, value, oldValue);
} else {
context().forward(key, new Change<>(value, null));
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
index 22a34b8..60104c4 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
@@ -49,8 +49,6 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.function.Function;
-import static java.util.Collections.singletonMap;
-
import static java.util.Collections.emptyMap;
import static org.apache.kafka.common.utils.Utils.mkEntry;
@@ -384,16 +382,12 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
// Deleting a non-joining record produces an unnecessary tombstone
for inner joins, because
// it's not possible to know whether a result was previously
emitted.
- // HOWEVER, when the final join result is materialized (either
explicitly or
- // implicitly by a subsequent join), we _can_ detect that the
tombstone is unnecessary and drop it.
// For the left join, the tombstone is necessary.
left.pipeInput("lhs1", (String) null);
{
assertThat(
outputTopic.readKeyValuesToMap(),
- is(leftJoin || !(materialized || rejoin)
- ? mkMap(mkEntry("lhs1", null))
- : emptyMap())
+ is(mkMap(mkEntry("lhs1", null)))
);
if (materialized) {
assertThat(
@@ -470,15 +464,11 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
// "moving" our subscription to another non-existent FK results in
an unnecessary tombstone for inner join,
// since it impossible to know whether the prior FK existed or not
(and thus whether any results have
// previously been emitted)
- // previously been emitted). HOWEVER, when the final join result
is materialized (either explicitly or
- // implicitly by a subsequent join), we _can_ detect that the
tombstone is unnecessary and drop it.
// The left join emits a _necessary_ update (since the lhs record
has actually changed)
left.pipeInput("lhs1", "lhsValue1|rhs2");
assertThat(
outputTopic.readKeyValuesToMap(),
- is(leftJoin
- ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs2,null)"))
- : (materialized || rejoin) ? emptyMap() :
singletonMap("lhs1", null))
+ is(mkMap(mkEntry("lhs1", leftJoin ? "(lhsValue1|rhs2,null)" :
null)))
);
if (materialized) {
assertThat(
@@ -490,9 +480,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
left.pipeInput("lhs1", "lhsValue1|rhs3");
assertThat(
outputTopic.readKeyValuesToMap(),
- is(leftJoin
- ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs3,null)"))
- : (materialized || rejoin) ? emptyMap() :
singletonMap("lhs1", null))
+ is(mkMap(mkEntry("lhs1", leftJoin ? "(lhsValue1|rhs3,null)" :
null)))
);
if (materialized) {
assertThat(
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
index e503403..02590f6 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
@@ -41,6 +41,7 @@ import org.apache.kafka.test.MockApiProcessor;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.Ignore;
import org.junit.Test;
import java.time.Duration;
@@ -91,6 +92,7 @@ public class KTableSourceTest {
supplier.theCapturedProcessor().processed());
}
+ @Ignore // we have disabled KIP-557 until KAFKA-12508 can be properly
addressed
@Test
public void testKTableSourceEmitOnChange() {
final StreamsBuilder builder = new StreamsBuilder();