This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new 2576e1d  KAFKA-12508: Disable KIP-557 (#10397)
2576e1d is described below

commit 2576e1d91d7c32523f0e50472532713c7d96427f
Author: John Roesler <[email protected]>
AuthorDate: Thu Mar 25 14:42:26 2021 -0500

    KAFKA-12508: Disable KIP-557 (#10397)
    
    A major issue has been raised that this implementation of
    emit-on-change is vulnerable to a number of data-loss bugs
    in the presence of recovery with dirty state under at-least-once
    semantics. This should be fixed in the future when we implement
    a way to avoid or clean up the dirty state under at-least-once,
    at which point it will be safe to re-introduce KIP-557 and
    complete it.
    
    Reviewers: A. Sophie Blee-Goldman <[email protected]>
---
 .../streams/kstream/internals/KTableSource.java    | 37 ++++------------------
 .../KTableKTableForeignKeyJoinIntegrationTest.java | 18 ++---------
 .../kstream/internals/KTableSourceTest.java        |  2 ++
 3 files changed, 11 insertions(+), 46 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
index f6756f5..b9f3580 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
@@ -21,20 +21,15 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
-import 
org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore;
-import 
org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore.RawAndDeserializedValue;
-import org.apache.kafka.streams.state.internals.WrappedStateStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Objects;
 
 import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
-import static 
org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics.skippedIdempotentUpdatesSensor;
 
 public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
     private static final Logger LOG = 
LoggerFactory.getLogger(KTableSource.class);
@@ -79,11 +74,10 @@ public class KTableSource<K, V> implements 
ProcessorSupplier<K, V> {
 
     private class KTableSourceProcessor extends AbstractProcessor<K, V> {
 
-        private MeteredTimestampedKeyValueStore<K, V> store;
+        private TimestampedKeyValueStore<K, V> store;
         private TimestampedTupleForwarder<K, V> tupleForwarder;
         private StreamsMetricsImpl metrics;
         private Sensor droppedRecordsSensor;
-        private Sensor skippedIdempotentUpdatesSensor = null;
 
         @SuppressWarnings("unchecked")
         @Override
@@ -92,24 +86,12 @@ public class KTableSource<K, V> implements 
ProcessorSupplier<K, V> {
             metrics = (StreamsMetricsImpl) context.metrics();
             droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
             if (queryableName != null) {
-                final StateStore stateStore = 
context.getStateStore(queryableName);
-                try {
-                    store = 
((WrappedStateStore<MeteredTimestampedKeyValueStore<K, V>, K, V>) 
stateStore).wrapped();
-                } catch (final ClassCastException e) {
-                    throw new IllegalStateException("Unexpected store type: " 
+ stateStore.getClass() + " for store: " + queryableName, e);
-                }
+                store = (TimestampedKeyValueStore<K, V>) 
context.getStateStore(queryableName);
                 tupleForwarder = new TimestampedTupleForwarder<>(
                     store,
                     context,
                     new TimestampedCacheFlushListener<>(context),
                     sendOldValues);
-                skippedIdempotentUpdatesSensor = 
skippedIdempotentUpdatesSensor(
-                    Thread.currentThread().getName(), 
-                    context.taskId().toString(), 
-                    ((InternalProcessorContext) context).currentNode().name(), 
-                    metrics
-                );
-
             }
         }
 
@@ -126,8 +108,7 @@ public class KTableSource<K, V> implements 
ProcessorSupplier<K, V> {
             }
 
             if (queryableName != null) {
-                final RawAndDeserializedValue<V> tuple = 
store.getWithBinary(key);
-                final ValueAndTimestamp<V> oldValueAndTimestamp = tuple.value;
+                final ValueAndTimestamp<V> oldValueAndTimestamp = 
store.get(key);
                 final V oldValue;
                 if (oldValueAndTimestamp != null) {
                     oldValue = oldValueAndTimestamp.value();
@@ -138,14 +119,8 @@ public class KTableSource<K, V> implements 
ProcessorSupplier<K, V> {
                 } else {
                     oldValue = null;
                 }
-                final ValueAndTimestamp<V> newValueAndTimestamp = 
ValueAndTimestamp.make(value, context().timestamp());
-                final boolean isDifferentValue = 
-                    store.putIfDifferentValues(key, newValueAndTimestamp, 
tuple.serializedValue);
-                if (isDifferentValue) {
-                    tupleForwarder.maybeForward(key, value, oldValue);
-                }  else {
-                    skippedIdempotentUpdatesSensor.record();
-                }
+                store.put(key, ValueAndTimestamp.make(value, 
context().timestamp()));
+                tupleForwarder.maybeForward(key, value, oldValue);
             } else {
                 context().forward(key, new Change<>(value, null));
             }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
index 22a34b8..60104c4 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
@@ -49,8 +49,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.function.Function;
-import static java.util.Collections.singletonMap;
-
 
 import static java.util.Collections.emptyMap;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
@@ -384,16 +382,12 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
 
             // Deleting a non-joining record produces an unnecessary tombstone 
for inner joins, because
             // it's not possible to know whether a result was previously 
emitted.
-            // HOWEVER, when the final join result is materialized (either 
explicitly or
-            // implicitly by a subsequent join), we _can_ detect that the 
tombstone is unnecessary and drop it.
             // For the left join, the tombstone is necessary.
             left.pipeInput("lhs1", (String) null);
             {
                 assertThat(
                     outputTopic.readKeyValuesToMap(),
-                    is(leftJoin || !(materialized || rejoin)
-                           ? mkMap(mkEntry("lhs1", null))
-                           : emptyMap())
+                    is(mkMap(mkEntry("lhs1", null)))
                 );
                 if (materialized) {
                     assertThat(
@@ -470,15 +464,11 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
             // "moving" our subscription to another non-existent FK results in 
an unnecessary tombstone for inner join,
             // since it impossible to know whether the prior FK existed or not 
(and thus whether any results have
             // previously been emitted)
-            // previously been emitted). HOWEVER, when the final join result 
is materialized (either explicitly or
-            // implicitly by a subsequent join), we _can_ detect that the 
tombstone is unnecessary and drop it.
             // The left join emits a _necessary_ update (since the lhs record 
has actually changed)
             left.pipeInput("lhs1", "lhsValue1|rhs2");
             assertThat(
                 outputTopic.readKeyValuesToMap(),
-                is(leftJoin
-                       ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs2,null)"))
-                       : (materialized || rejoin) ? emptyMap() : 
singletonMap("lhs1", null))
+                is(mkMap(mkEntry("lhs1", leftJoin ? "(lhsValue1|rhs2,null)" : 
null)))
             );
             if (materialized) {
                 assertThat(
@@ -490,9 +480,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
             left.pipeInput("lhs1", "lhsValue1|rhs3");
             assertThat(
                 outputTopic.readKeyValuesToMap(),
-                is(leftJoin
-                       ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs3,null)"))
-                       : (materialized || rejoin) ? emptyMap() : 
singletonMap("lhs1", null))
+                is(mkMap(mkEntry("lhs1", leftJoin ? "(lhsValue1|rhs3,null)" : 
null)))
             );
             if (materialized) {
                 assertThat(
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
index e503403..02590f6 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
@@ -41,6 +41,7 @@ import org.apache.kafka.test.MockApiProcessor;
 import org.apache.kafka.test.MockApiProcessorSupplier;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import java.time.Duration;
@@ -91,6 +92,7 @@ public class KTableSourceTest {
             supplier.theCapturedProcessor().processed());
     }
 
+    @Ignore // we have disabled KIP-557 until KAFKA-12508 can be properly 
addressed
     @Test
     public void testKTableSourceEmitOnChange() {
         final StreamsBuilder builder = new StreamsBuilder();

Reply via email to