This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.0 by this push:
     new 95378f0  KAFKA-6782: solved the bug of restoration of aborted messages 
for GlobalStateStore and KGlobalTable (#4900)
95378f0 is described below

commit 95378f0dd3bb1293f2e0d19a54d4a2febfc14e7b
Author: Gitomain <lingxiaowan...@gmail.com>
AuthorDate: Tue Jun 12 20:54:07 2018 +0200

    KAFKA-6782: solved the bug of restoration of aborted messages for 
GlobalStateStore and KGlobalTable (#4900)
    
    Reviewer: Matthias J. Sax <matth...@confluent.io>, Bill Bejeck 
<b...@confluent.io>, Guozhang Wang <guozh...@confluent.io>
---
 .gitignore                                         |   1 +
 kafka                                              |   1 +
 .../internals/GlobalStateManagerImpl.java          |   2 +-
 ...st.java => GlobalKTableEOSIntegrationTest.java} | 117 +++++++++++++++------
 .../integration/GlobalKTableIntegrationTest.java   |  60 ++---------
 .../integration/utils/IntegrationTestUtils.java    |  35 ++++--
 .../processor/internals/StreamTaskTest.java        |   6 +-
 7 files changed, 127 insertions(+), 95 deletions(-)

diff --git a/.gitignore b/.gitignore
index 6088349..964c8f6 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,5 +1,6 @@
 dist
 *classes
+*.class
 target/
 build/
 build_eclipse/
diff --git a/kafka b/kafka
new file mode 160000
index 0000000..cc43e77
--- /dev/null
+++ b/kafka
@@ -0,0 +1 @@
+Subproject commit cc43e77bbbfad71883011186de55603c936cbcd1
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 6052f96..036bb1d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -192,8 +192,8 @@ public class GlobalStateManagerImpl implements 
GlobalStateManager {
                     if (record.key() != null) {
                         restoreRecords.add(KeyValue.pair(record.key(), 
record.value()));
                     }
-                    offset = consumer.position(topicPartition);
                 }
+                offset = consumer.position(topicPartition);
                 stateRestoreAdapter.restoreAll(restoreRecords);
                 stateRestoreListener.onBatchRestored(topicPartition, 
storeName, offset, restoreRecords.size());
                 restoreCount += restoreRecords.size();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
similarity index 77%
copy from 
streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
copy to 
streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
index ba8841a..9c202f1 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
@@ -34,7 +34,6 @@ import 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.ValueJoiner;
@@ -58,7 +57,7 @@ import java.util.Map;
 import java.util.Properties;
 
 @Category({IntegrationTest.class})
-public class GlobalKTableIntegrationTest {
+public class GlobalKTableEOSIntegrationTest {
     private static final int NUM_BROKERS = 1;
     private static final Properties BROKER_CONFIG;
     static {
@@ -85,17 +84,15 @@ public class GlobalKTableIntegrationTest {
             return value1 + "+" + value2;
         }
     };
+    private final String globalStore = "globalStore";
+    private final Map<String, String> results = new HashMap<>();
     private StreamsBuilder builder;
     private Properties streamsConfiguration;
     private KafkaStreams kafkaStreams;
-    private String globalOne;
-    private String inputStream;
-    private String inputTable;
-    private final String globalStore = "globalStore";
+    private String globalTableTopic;
+    private String streamTopic;
     private GlobalKTable<Long, String> globalTable;
     private KStream<String, Long> stream;
-    private KTable<String, Long> table;
-    final Map<String, String> results = new HashMap<>();
     private ForeachAction<String, String> foreachAction;
 
     @Before
@@ -104,22 +101,21 @@ public class GlobalKTableIntegrationTest {
         builder = new StreamsBuilder();
         createTopics();
         streamsConfiguration = new Properties();
-        final String applicationId = "globalOne-table-test-" + testNo;
+        final String applicationId = "globalTableTopic-table-eos-test-" + 
testNo;
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, 
applicationId);
-        streamsConfiguration
-                .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
         
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         
streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, 
true);
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
-        globalTable = builder.globalTable(globalOne, 
Consumed.with(Serdes.Long(), Serdes.String()),
+        streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
"exactly_once");
+        globalTable = builder.globalTable(globalTableTopic, 
Consumed.with(Serdes.Long(), Serdes.String()),
                                           Materialized.<Long, String, 
KeyValueStore<Bytes, byte[]>>as(globalStore)
                                                   .withKeySerde(Serdes.Long())
                                                   
.withValueSerde(Serdes.String()));
         final Consumed<String, Long> stringLongConsumed = 
Consumed.with(Serdes.String(), Serdes.Long());
-        stream = builder.stream(inputStream, stringLongConsumed);
-        table = builder.table(inputTable, stringLongConsumed);
+        stream = builder.stream(streamTopic, stringLongConsumed);
         foreachAction = new ForeachAction<String, String>() {
             @Override
             public void apply(final String key, final String value) {
@@ -142,7 +138,7 @@ public class GlobalKTableIntegrationTest {
         streamTableJoin.foreach(foreachAction);
         produceInitialGlobalTableValues();
         startStreams();
-        produceTopicValues(inputStream);
+        produceTopicValues(streamTopic);
 
         final Map<String, String> expected = new HashMap<>();
         expected.put("a", "1+A");
@@ -169,7 +165,7 @@ public class GlobalKTableIntegrationTest {
                 return "J".equals(replicatedStore.get(5L));
             }
         }, 30000, "waiting for data in replicated store");
-        produceTopicValues(inputStream);
+        produceTopicValues(streamTopic);
 
         expected.put("a", "1+F");
         expected.put("b", "2+G");
@@ -191,7 +187,7 @@ public class GlobalKTableIntegrationTest {
         streamTableJoin.foreach(foreachAction);
         produceInitialGlobalTableValues();
         startStreams();
-        produceTopicValues(inputStream);
+        produceTopicValues(streamTopic);
 
         final Map<String, String> expected = new HashMap<>();
         expected.put("a", "1+A");
@@ -218,7 +214,7 @@ public class GlobalKTableIntegrationTest {
             }
         }, 30000, "waiting for data in replicated store");
 
-        produceTopicValues(inputStream);
+        produceTopicValues(streamTopic);
 
         expected.put("a", "1+F");
         expected.put("b", "2+G");
@@ -236,7 +232,8 @@ public class GlobalKTableIntegrationTest {
 
     @Test
     public void shouldRestoreTransactionalMessages() throws Exception {
-        produceInitialGlobalTableValues(true);
+        produceInitialGlobalTableValues();
+
         startStreams();
 
         final Map<Long, String> expected = new HashMap<>();
@@ -263,17 +260,49 @@ public class GlobalKTableIntegrationTest {
                 return result.equals(expected);
             }
         }, 30000L, "waiting for initial values");
-        System.out.println("no failed test");
     }
+    
+    @Test
+    public void shouldNotRestoreAbortedMessages() throws Exception {
+        produceAbortedMessages();
+        produceInitialGlobalTableValues();
+        produceAbortedMessages();
 
-    private void createTopics() throws InterruptedException {
-        inputStream = "input-stream-" + testNo;
-        inputTable = "input-table-" + testNo;
-        globalOne = "globalOne-" + testNo;
-        CLUSTER.createTopics(inputStream, inputTable);
-        CLUSTER.createTopic(globalOne, 2, 1);
+        startStreams();
+        
+        final Map<Long, String> expected = new HashMap<>();
+        expected.put(1L, "A");
+        expected.put(2L, "B");
+        expected.put(3L, "C");
+        expected.put(4L, "D");
+
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                ReadOnlyKeyValueStore<Long, String> store = null;
+                try {
+                    store = kafkaStreams.store(globalStore, 
QueryableStoreTypes.<Long, String>keyValueStore());
+                } catch (InvalidStateStoreException ex) {
+                    return false;
+                }
+                Map<Long, String> result = new HashMap<>();
+                Iterator<KeyValue<Long, String>> it = store.all();
+                while (it.hasNext()) {
+                    KeyValue<Long, String> kv = it.next();
+                    result.put(kv.key, kv.value);
+                }
+                return result.equals(expected);
+            }
+        }, 30000L, "waiting for initial values");
     }
 
+    private void createTopics() throws InterruptedException {
+        streamTopic = "stream-" + testNo;
+        globalTableTopic = "globalTable-" + testNo;
+        CLUSTER.createTopics(streamTopic);
+        CLUSTER.createTopic(globalTableTopic, 2, 1);
+    }
+    
     private void startStreams() {
         kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
         kafkaStreams.start();
@@ -296,23 +325,43 @@ public class GlobalKTableIntegrationTest {
                 mockTime);
     }
 
-    private void produceInitialGlobalTableValues() throws 
java.util.concurrent.ExecutionException, InterruptedException {
-        produceInitialGlobalTableValues(false);
+    private void produceAbortedMessages() throws Exception {
+        final Properties properties = new Properties();
+        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "someid");
+        properties.put(ProducerConfig.RETRIES_CONFIG, 1);
+        IntegrationTestUtils.produceAbortedKeyValuesSynchronouslyWithTimestamp(
+                globalTableTopic, Arrays.asList(
+                        new KeyValue<>(1L, "A"),
+                        new KeyValue<>(2L, "B"),
+                        new KeyValue<>(3L, "C"),
+                        new KeyValue<>(4L, "D")
+                        ), 
+                TestUtils.producerConfig(
+                                CLUSTER.bootstrapServers(),
+                                LongSerializer.class,
+                                StringSerializer.class,
+                                properties),
+                mockTime.milliseconds());
     }
 
-    private void produceInitialGlobalTableValues(final boolean 
enableTransactions) throws java.util.concurrent.ExecutionException, 
InterruptedException {
-        Properties properties = new Properties();
+    private void produceInitialGlobalTableValues() throws Exception {
+        produceInitialGlobalTableValues(true);
+    }
+
+    private void produceInitialGlobalTableValues(final boolean 
enableTransactions) throws Exception {
+        final Properties properties = new Properties();
         if (enableTransactions) {
             properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "someid");
             properties.put(ProducerConfig.RETRIES_CONFIG, 1);
         }
         IntegrationTestUtils.produceKeyValuesSynchronously(
-                globalOne,
+                globalTableTopic,
                 Arrays.asList(
                         new KeyValue<>(1L, "A"),
                         new KeyValue<>(2L, "B"),
                         new KeyValue<>(3L, "C"),
-                        new KeyValue<>(4L, "D")),
+                        new KeyValue<>(4L, "D")
+                        ),
                 TestUtils.producerConfig(
                         CLUSTER.bootstrapServers(),
                         LongSerializer.class,
@@ -324,7 +373,7 @@ public class GlobalKTableIntegrationTest {
 
     private void produceGlobalTableValues() throws Exception {
         IntegrationTestUtils.produceKeyValuesSynchronously(
-                globalOne,
+                globalTableTopic,
                 Arrays.asList(
                         new KeyValue<>(1L, "F"),
                         new KeyValue<>(2L, "G"),
@@ -338,6 +387,4 @@ public class GlobalKTableIntegrationTest {
                         new Properties()),
                 mockTime);
     }
-
-
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
index ba8841a..0816aba 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
@@ -28,7 +28,6 @@ import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.ForeachAction;
@@ -53,23 +52,16 @@ import org.junit.experimental.categories.Category;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.Properties;
 
 @Category({IntegrationTest.class})
 public class GlobalKTableIntegrationTest {
     private static final int NUM_BROKERS = 1;
-    private static final Properties BROKER_CONFIG;
-    static {
-        BROKER_CONFIG = new Properties();
-        BROKER_CONFIG.put("transaction.state.log.replication.factor", (short) 
1);
-        BROKER_CONFIG.put("transaction.state.log.min.isr", 1);
-    }
 
     @ClassRule
     public static final EmbeddedKafkaCluster CLUSTER =
-            new EmbeddedKafkaCluster(NUM_BROKERS, BROKER_CONFIG);
+            new EmbeddedKafkaCluster(NUM_BROKERS);
 
     private static volatile int testNo = 0;
     private final MockTime mockTime = CLUSTER.time;
@@ -233,39 +225,7 @@ public class GlobalKTableIntegrationTest {
             }
         }, 30000L, "waiting for final values");
     }
-
-    @Test
-    public void shouldRestoreTransactionalMessages() throws Exception {
-        produceInitialGlobalTableValues(true);
-        startStreams();
-
-        final Map<Long, String> expected = new HashMap<>();
-        expected.put(1L, "A");
-        expected.put(2L, "B");
-        expected.put(3L, "C");
-        expected.put(4L, "D");
-
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                ReadOnlyKeyValueStore<Long, String> store = null;
-                try {
-                    store = kafkaStreams.store(globalStore, 
QueryableStoreTypes.<Long, String>keyValueStore());
-                } catch (InvalidStateStoreException ex) {
-                    return false;
-                }
-                Map<Long, String> result = new HashMap<>();
-                Iterator<KeyValue<Long, String>> it = store.all();
-                while (it.hasNext()) {
-                    KeyValue<Long, String> kv = it.next();
-                    result.put(kv.key, kv.value);
-                }
-                return result.equals(expected);
-            }
-        }, 30000L, "waiting for initial values");
-        System.out.println("no failed test");
-    }
-
+    
     private void createTopics() throws InterruptedException {
         inputStream = "input-stream-" + testNo;
         inputTable = "input-table-" + testNo;
@@ -273,7 +233,7 @@ public class GlobalKTableIntegrationTest {
         CLUSTER.createTopics(inputStream, inputTable);
         CLUSTER.createTopic(globalOne, 2, 1);
     }
-
+    
     private void startStreams() {
         kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
         kafkaStreams.start();
@@ -296,11 +256,11 @@ public class GlobalKTableIntegrationTest {
                 mockTime);
     }
 
-    private void produceInitialGlobalTableValues() throws 
java.util.concurrent.ExecutionException, InterruptedException {
+    private void produceInitialGlobalTableValues() throws Exception {
         produceInitialGlobalTableValues(false);
     }
 
-    private void produceInitialGlobalTableValues(final boolean 
enableTransactions) throws java.util.concurrent.ExecutionException, 
InterruptedException {
+    private void produceInitialGlobalTableValues(final boolean 
enableTransactions) throws Exception {
         Properties properties = new Properties();
         if (enableTransactions) {
             properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "someid");
@@ -312,14 +272,14 @@ public class GlobalKTableIntegrationTest {
                         new KeyValue<>(1L, "A"),
                         new KeyValue<>(2L, "B"),
                         new KeyValue<>(3L, "C"),
-                        new KeyValue<>(4L, "D")),
+                        new KeyValue<>(4L, "D")
+                        ),
                 TestUtils.producerConfig(
                         CLUSTER.bootstrapServers(),
                         LongSerializer.class,
-                        StringSerializer.class,
-                        properties),
-                mockTime,
-                enableTransactions);
+                        StringSerializer.class
+                        ),
+                mockTime);
     }
 
     private void produceGlobalTableValues() throws Exception {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index e8cd59e..304a3e5 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -140,16 +140,38 @@ public class IntegrationTestUtils {
             producer.flush();
         }
     }
+    
+    public static <K, V> void 
produceAbortedKeyValuesSynchronouslyWithTimestamp(final String topic,
+                                                                               
 final Collection<KeyValue<K, V>> records,
+                                                                               
 final Properties producerConfig,
+                                                                               
 final Long timestamp)
+        throws ExecutionException, InterruptedException {
+        try (final Producer<K, V> producer = new 
KafkaProducer<>(producerConfig)) {
+            producer.initTransactions();
+            for (final KeyValue<K, V> record : records) {
+                producer.beginTransaction();
+                final Future<RecordMetadata> f = producer
+                        .send(new ProducerRecord<>(topic, null, timestamp, 
record.key, record.value));
+                f.get();
+                producer.abortTransaction();
+            }
+        }    
+    }
 
-    public static <V> void produceValuesSynchronously(
-        final String topic, final Collection<V> records, final Properties 
producerConfig, final Time time)
+    public static <V> void produceValuesSynchronously(final String topic,
+                                                      final Collection<V> 
records,
+                                                      final Properties 
producerConfig,
+                                                      final Time time)
         throws ExecutionException, InterruptedException {
         IntegrationTestUtils.produceValuesSynchronously(topic, records, 
producerConfig, time, false);
     }
 
-    public static <V> void produceValuesSynchronously(
-        final String topic, final Collection<V> records, final Properties 
producerConfig, final Time time, final boolean enableTransactions)
-        throws ExecutionException, InterruptedException {
+    public static <V> void produceValuesSynchronously(final String topic,
+                                                      final Collection<V> 
records,
+                                                      final Properties 
producerConfig,
+                                                      final Time time,
+                                                      final boolean 
enableTransactions)
+            throws ExecutionException, InterruptedException {
         final Collection<KeyValue<Object, V>> keyedRecords = new ArrayList<>();
         for (final V value : records) {
             final KeyValue<Object, V> kv = new KeyValue<>(null, value);
@@ -161,10 +183,9 @@ public class IntegrationTestUtils {
     public static <K, V> List<KeyValue<K, V>> 
waitUntilMinKeyValueRecordsReceived(final Properties consumerConfig,
                                                                                
   final String topic,
                                                                                
   final int expectedNumRecords) throws InterruptedException {
-
         return waitUntilMinKeyValueRecordsReceived(consumerConfig, topic, 
expectedNumRecords, DEFAULT_TIMEOUT);
     }
-
+    
     /**
      * Wait until enough data (key-value records) has been consumed.
      *
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index caa6cb7..3321da5 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -905,10 +905,12 @@ public class StreamTaskTest {
 
     @Test
     public void shouldNotThrowOnCloseIfTaskWasNotInitializedWithEosEnabled() {
-        task = createStatelessTask(true);
+        final MockProducer producer = new MockProducer();
+        task = new StreamTask(taskId00, applicationId, partitions, topology, 
consumer, changelogReader,
+            eosConfig, streamsMetrics, stateDirectory, null, time, producer);
 
-        assertTrue(!producer.transactionInFlight());
         task.close(false, false);
+        task = null;
     }
 
     @Test

-- 
To stop receiving notification emails like this one, please contact
mj...@apache.org.

Reply via email to