This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new 735d664  KAFKA-6782: solved the bug of restoration of aborted messages 
for GlobalStateStore and KGlobalTable (#4900)
735d664 is described below

commit 735d664c786c1011391a19cd6a317764eea2d058
Author: Gitomain <lingxiaowan...@gmail.com>
AuthorDate: Tue Jun 12 20:54:07 2018 +0200

    KAFKA-6782: solved the bug of restoration of aborted messages for 
GlobalStateStore and KGlobalTable (#4900)
    
    Reviewer: Matthias J. Sax <matth...@confluent.io>, Bill Bejeck 
<b...@confluent.io>, Guozhang Wang <guozh...@confluent.io>
---
 .gitignore                                         |  1 +
 kafka                                              |  1 +
 .../internals/GlobalStateManagerImpl.java          |  2 +-
 ...st.java => GlobalKTableEOSIntegrationTest.java} | 74 +++++++++++++++++++---
 .../integration/GlobalKTableIntegrationTest.java   | 66 +++----------------
 .../integration/utils/IntegrationTestUtils.java    | 35 ++++++++--
 6 files changed, 103 insertions(+), 76 deletions(-)

diff --git a/.gitignore b/.gitignore
index 04f8fee..fe191ee 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,5 +1,6 @@
 dist
 *classes
+*.class
 target/
 build/
 build_eclipse/
diff --git a/kafka b/kafka
new file mode 160000
index 0000000..cc43e77
--- /dev/null
+++ b/kafka
@@ -0,0 +1 @@
+Subproject commit cc43e77bbbfad71883011186de55603c936cbcd1
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 4fd7a59..79088d9 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -271,8 +271,8 @@ public class GlobalStateManagerImpl extends 
AbstractStateManager implements Glob
                         if (record.key() != null) {
                             restoreRecords.add(KeyValue.pair(record.key(), 
record.value()));
                         }
-                        offset = globalConsumer.position(topicPartition);
                     }
+                    offset = globalConsumer.position(topicPartition);
                     stateRestoreAdapter.restoreAll(restoreRecords);
                     stateRestoreListener.onBatchRestored(topicPartition, 
storeName, offset, restoreRecords.size());
                     restoreCount += restoreRecords.size();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
similarity index 83%
copy from 
streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
copy to 
streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
index 8c6a30a..f7c0e55 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
@@ -57,7 +57,7 @@ import java.util.Map;
 import java.util.Properties;
 
 @Category({IntegrationTest.class})
-public class GlobalKTableIntegrationTest {
+public class GlobalKTableEOSIntegrationTest {
     private static final int NUM_BROKERS = 1;
     private static final Properties BROKER_CONFIG;
     static {
@@ -101,15 +101,15 @@ public class GlobalKTableIntegrationTest {
         builder = new StreamsBuilder();
         createTopics();
         streamsConfiguration = new Properties();
-        final String applicationId = "globalTableTopic-table-test-" + testNo;
+        final String applicationId = "globalTableTopic-table-eos-test-" + 
testNo;
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, 
applicationId);
-        streamsConfiguration
-                .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
         
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         
streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, 
true);
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
+        streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
"exactly_once");
         globalTable = builder.globalTable(globalTableTopic, 
Consumed.with(Serdes.Long(), Serdes.String()),
                                           Materialized.<Long, String, 
KeyValueStore<Bytes, byte[]>>as(globalStore)
                                                   .withKeySerde(Serdes.Long())
@@ -232,7 +232,8 @@ public class GlobalKTableIntegrationTest {
 
     @Test
     public void shouldRestoreTransactionalMessages() throws Exception {
-        produceInitialGlobalTableValues(true);
+        produceInitialGlobalTableValues();
+
         startStreams();
 
         final Map<Long, String> expected = new HashMap<>();
@@ -259,7 +260,40 @@ public class GlobalKTableIntegrationTest {
                 return result.equals(expected);
             }
         }, 30000L, "waiting for initial values");
-        System.out.println("no failed test");
+    }
+    
+    @Test
+    public void shouldNotRestoreAbortedMessages() throws Exception {
+        produceAbortedMessages();
+        produceInitialGlobalTableValues();
+        produceAbortedMessages();
+
+        startStreams();
+        
+        final Map<Long, String> expected = new HashMap<>();
+        expected.put(1L, "A");
+        expected.put(2L, "B");
+        expected.put(3L, "C");
+        expected.put(4L, "D");
+
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                ReadOnlyKeyValueStore<Long, String> store = null;
+                try {
+                    store = kafkaStreams.store(globalStore, 
QueryableStoreTypes.<Long, String>keyValueStore());
+                } catch (InvalidStateStoreException ex) {
+                    return false;
+                }
+                Map<Long, String> result = new HashMap<>();
+                Iterator<KeyValue<Long, String>> it = store.all();
+                while (it.hasNext()) {
+                    KeyValue<Long, String> kv = it.next();
+                    result.put(kv.key, kv.value);
+                }
+                return result.equals(expected);
+            }
+        }, 30000L, "waiting for initial values");
     }
 
     private void createTopics() throws InterruptedException {
@@ -268,7 +302,7 @@ public class GlobalKTableIntegrationTest {
         CLUSTER.createTopics(streamTopic);
         CLUSTER.createTopic(globalTableTopic, 2, 1);
     }
-
+    
     private void startStreams() {
         kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
         kafkaStreams.start();
@@ -291,12 +325,31 @@ public class GlobalKTableIntegrationTest {
                 mockTime);
     }
 
+    private void produceAbortedMessages() throws Exception {
+        final Properties properties = new Properties();
+        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "someid");
+        properties.put(ProducerConfig.RETRIES_CONFIG, 1);
+        IntegrationTestUtils.produceAbortedKeyValuesSynchronouslyWithTimestamp(
+                globalTableTopic, Arrays.asList(
+                        new KeyValue<>(1L, "A"),
+                        new KeyValue<>(2L, "B"),
+                        new KeyValue<>(3L, "C"),
+                        new KeyValue<>(4L, "D")
+                        ), 
+                TestUtils.producerConfig(
+                                CLUSTER.bootstrapServers(),
+                                LongSerializer.class,
+                                StringSerializer.class,
+                                properties),
+                mockTime.milliseconds());
+    }
+
     private void produceInitialGlobalTableValues() throws Exception {
-        produceInitialGlobalTableValues(false);
+        produceInitialGlobalTableValues(true);
     }
 
     private void produceInitialGlobalTableValues(final boolean 
enableTransactions) throws Exception {
-        Properties properties = new Properties();
+        final Properties properties = new Properties();
         if (enableTransactions) {
             properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "someid");
             properties.put(ProducerConfig.RETRIES_CONFIG, 1);
@@ -307,7 +360,8 @@ public class GlobalKTableIntegrationTest {
                         new KeyValue<>(1L, "A"),
                         new KeyValue<>(2L, "B"),
                         new KeyValue<>(3L, "C"),
-                        new KeyValue<>(4L, "D")),
+                        new KeyValue<>(4L, "D")
+                        ),
                 TestUtils.producerConfig(
                         CLUSTER.bootstrapServers(),
                         LongSerializer.class,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
index 8c6a30a..900e652 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.integration;
 
 import kafka.utils.MockTime;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
@@ -28,7 +27,6 @@ import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.ForeachAction;
@@ -52,23 +50,16 @@ import org.junit.experimental.categories.Category;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.Properties;
 
 @Category({IntegrationTest.class})
 public class GlobalKTableIntegrationTest {
     private static final int NUM_BROKERS = 1;
-    private static final Properties BROKER_CONFIG;
-    static {
-        BROKER_CONFIG = new Properties();
-        BROKER_CONFIG.put("transaction.state.log.replication.factor", (short) 
1);
-        BROKER_CONFIG.put("transaction.state.log.min.isr", 1);
-    }
 
     @ClassRule
     public static final EmbeddedKafkaCluster CLUSTER =
-            new EmbeddedKafkaCluster(NUM_BROKERS, BROKER_CONFIG);
+            new EmbeddedKafkaCluster(NUM_BROKERS);
 
     private static volatile int testNo = 0;
     private final MockTime mockTime = CLUSTER.time;
@@ -229,46 +220,14 @@ public class GlobalKTableIntegrationTest {
             }
         }, 30000L, "waiting for final values");
     }
-
-    @Test
-    public void shouldRestoreTransactionalMessages() throws Exception {
-        produceInitialGlobalTableValues(true);
-        startStreams();
-
-        final Map<Long, String> expected = new HashMap<>();
-        expected.put(1L, "A");
-        expected.put(2L, "B");
-        expected.put(3L, "C");
-        expected.put(4L, "D");
-
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                ReadOnlyKeyValueStore<Long, String> store = null;
-                try {
-                    store = kafkaStreams.store(globalStore, 
QueryableStoreTypes.<Long, String>keyValueStore());
-                } catch (InvalidStateStoreException ex) {
-                    return false;
-                }
-                Map<Long, String> result = new HashMap<>();
-                Iterator<KeyValue<Long, String>> it = store.all();
-                while (it.hasNext()) {
-                    KeyValue<Long, String> kv = it.next();
-                    result.put(kv.key, kv.value);
-                }
-                return result.equals(expected);
-            }
-        }, 30000L, "waiting for initial values");
-        System.out.println("no failed test");
-    }
-
+    
     private void createTopics() throws InterruptedException {
         streamTopic = "stream-" + testNo;
         globalTableTopic = "globalTable-" + testNo;
         CLUSTER.createTopics(streamTopic);
         CLUSTER.createTopic(globalTableTopic, 2, 1);
     }
-
+    
     private void startStreams() {
         kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
         kafkaStreams.start();
@@ -292,29 +251,20 @@ public class GlobalKTableIntegrationTest {
     }
 
     private void produceInitialGlobalTableValues() throws Exception {
-        produceInitialGlobalTableValues(false);
-    }
-
-    private void produceInitialGlobalTableValues(final boolean 
enableTransactions) throws Exception {
-        Properties properties = new Properties();
-        if (enableTransactions) {
-            properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "someid");
-            properties.put(ProducerConfig.RETRIES_CONFIG, 1);
-        }
         IntegrationTestUtils.produceKeyValuesSynchronously(
                 globalTableTopic,
                 Arrays.asList(
                         new KeyValue<>(1L, "A"),
                         new KeyValue<>(2L, "B"),
                         new KeyValue<>(3L, "C"),
-                        new KeyValue<>(4L, "D")),
+                        new KeyValue<>(4L, "D")
+                        ),
                 TestUtils.producerConfig(
                         CLUSTER.bootstrapServers(),
                         LongSerializer.class,
-                        StringSerializer.class,
-                        properties),
-                mockTime,
-                enableTransactions);
+                        StringSerializer.class
+                        ),
+                mockTime);
     }
 
     private void produceGlobalTableValues() throws Exception {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 86cb331..2ab6639 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -180,16 +180,38 @@ public class IntegrationTestUtils {
             producer.flush();
         }
     }
+    
+    public static <K, V> void 
produceAbortedKeyValuesSynchronouslyWithTimestamp(final String topic,
+                                                                               
 final Collection<KeyValue<K, V>> records,
+                                                                               
 final Properties producerConfig,
+                                                                               
 final Long timestamp)
+        throws ExecutionException, InterruptedException {
+        try (final Producer<K, V> producer = new 
KafkaProducer<>(producerConfig)) {
+            producer.initTransactions();
+            for (final KeyValue<K, V> record : records) {
+                producer.beginTransaction();
+                final Future<RecordMetadata> f = producer
+                        .send(new ProducerRecord<>(topic, null, timestamp, 
record.key, record.value));
+                f.get();
+                producer.abortTransaction();
+            }
+        }    
+    }
 
-    public static <V> void produceValuesSynchronously(
-        final String topic, final Collection<V> records, final Properties 
producerConfig, final Time time)
+    public static <V> void produceValuesSynchronously(final String topic,
+                                                      final Collection<V> 
records,
+                                                      final Properties 
producerConfig,
+                                                      final Time time)
         throws ExecutionException, InterruptedException {
         IntegrationTestUtils.produceValuesSynchronously(topic, records, 
producerConfig, time, false);
     }
 
-    public static <V> void produceValuesSynchronously(
-        final String topic, final Collection<V> records, final Properties 
producerConfig, final Time time, final boolean enableTransactions)
-        throws ExecutionException, InterruptedException {
+    public static <V> void produceValuesSynchronously(final String topic,
+                                                      final Collection<V> 
records,
+                                                      final Properties 
producerConfig,
+                                                      final Time time,
+                                                      final boolean 
enableTransactions)
+            throws ExecutionException, InterruptedException {
         final Collection<KeyValue<Object, V>> keyedRecords = new ArrayList<>();
         for (final V value : records) {
             final KeyValue<Object, V> kv = new KeyValue<>(null, value);
@@ -241,10 +263,9 @@ public class IntegrationTestUtils {
     public static <K, V> List<KeyValue<K, V>> 
waitUntilMinKeyValueRecordsReceived(final Properties consumerConfig,
                                                                                
   final String topic,
                                                                                
   final int expectedNumRecords) throws InterruptedException {
-
         return waitUntilMinKeyValueRecordsReceived(consumerConfig, topic, 
expectedNumRecords, DEFAULT_TIMEOUT);
     }
-
+    
     /**
      * Wait until enough data (key-value records) has been consumed.
      *

-- 
To stop receiving notification emails like this one, please contact
mj...@apache.org.

Reply via email to