This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new 5603d8b  KAFKA-7021: Reuse source based on config (#5163)
5603d8b is described below

commit 5603d8b4464cc5f07e8d4fdf0ff57421b7a2382f
Author: Guozhang Wang <wangg...@gmail.com>
AuthorDate: Mon Jun 11 16:08:24 2018 -0700

    KAFKA-7021: Reuse source based on config (#5163)
    
    This PR actually contains two changes:
    
    1. leverage on the TOPOLOGY_OPTIMIZATION config to "adjust" the topology 
internally to reuse the source topic.
    
    2. fixed a long dangling bug that whenever source topic is reused as 
changelog topic, write the checkpoint file for the consumed offset, this is 
done by union the ackedOffset from the producer, plus the consumed offset from 
the consumer, note we will priori ackedOffset since the same topic may show up 
in both (think about repartition topic), by doing this the consumed offset from 
source topics can be treated as checkpointed offset when reuse happens.
    
    3. added a few unit and integration tests with / wo the reusing, and make 
sure the restoration, standby task, and internal topic creation behaviors are 
all correct.
    
    Reviewers: John Roesler <j...@confluent.io>, Bill Bejeck 
<b...@confluent.io>, Matthias J. Sax <matth...@confluent.io>
---
 .../org/apache/kafka/streams/KafkaStreams.java     |   6 +-
 .../org/apache/kafka/streams/StreamsBuilder.java   |   7 +-
 .../java/org/apache/kafka/streams/Topology.java    |   1 -
 .../kstream/internals/InternalStreamsBuilder.java  |   8 +-
 .../streams/processor/internals/AbstractTask.java  |   4 +-
 .../internals/InternalTopologyBuilder.java         |  44 ++++++-
 .../processor/internals/ProcessorStateManager.java |  22 ++--
 .../processor/internals/StateDirectory.java        |   2 +-
 .../streams/processor/internals/StreamTask.java    |  11 +-
 .../apache/kafka/streams/StreamsBuilderTest.java   |  27 +++-
 .../integration/RestoreIntegrationTest.java        | 137 +++++++++++++++++----
 .../integration/TableTableJoinIntegrationTest.java |   4 +-
 .../internals/InternalStreamsBuilderTest.java      |   2 +-
 .../processor/internals/StreamThreadTest.java      |  32 ++---
 .../internals/StreamsPartitionAssignorTest.java    |   1 +
 15 files changed, 230 insertions(+), 78 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index d6002ff..6a707ff 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -599,7 +599,7 @@ public class KafkaStreams {
     @Deprecated
     public KafkaStreams(final Topology topology,
                         final StreamsConfig config) {
-        this(topology.internalTopologyBuilder, config, new 
DefaultKafkaClientSupplier());
+        this(topology, config, new DefaultKafkaClientSupplier());
     }
 
     /**
@@ -635,6 +635,10 @@ public class KafkaStreams {
         this.config = config;
         this.time = time;
 
+        // adjust the topology if optimization is turned on.
+        // TODO: to be removed post 2.0
+        internalTopologyBuilder.adjust(config);
+
         // The application ID is a required config and hence should always 
have value
         processId = UUID.randomUUID();
         final String userClientId = 
config.getString(StreamsConfig.CLIENT_ID_CONFIG);
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index 517104d..ae6d44c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -302,11 +302,10 @@ public class StreamsBuilder {
         Objects.requireNonNull(materialized, "materialized can't be null");
         final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> 
materializedInternal = new MaterializedInternal<>(materialized);
         materializedInternal.generateStoreNameIfNeeded(internalStreamsBuilder, 
topic + "-");
+        final ConsumedInternal<K, V> consumedInternal =
+                new 
ConsumedInternal<>(Consumed.with(materializedInternal.keySerde(), 
materializedInternal.valueSerde()));
 
-        return internalStreamsBuilder.table(topic,
-                                            new 
ConsumedInternal<>(Consumed.with(materializedInternal.keySerde(),
-                                                                               
  materializedInternal.valueSerde())),
-                                            materializedInternal);
+        return internalStreamsBuilder.table(topic, consumedInternal, 
materializedInternal);
     }
 
     /**
diff --git a/streams/src/main/java/org/apache/kafka/streams/Topology.java 
b/streams/src/main/java/org/apache/kafka/streams/Topology.java
index 22f6ea8..753185c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/Topology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/Topology.java
@@ -776,5 +776,4 @@ public class Topology {
     public synchronized TopologyDescription describe() {
         return internalTopologyBuilder.describe();
     }
-
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 0a19b4e..c7bf2fa 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -72,11 +72,7 @@ public class InternalStreamsBuilder implements 
InternalNameProvider {
     public <K, V> KTable<K, V> table(final String topic,
                                      final ConsumedInternal<K, V> consumed,
                                      final MaterializedInternal<K, V, 
KeyValueStore<Bytes, byte[]>> materialized) {
-        // explicitly disable logging for source table materialized stores
-        materialized.withLoggingDisabled();
-
-        final StoreBuilder<KeyValueStore<K, V>> storeBuilder = new 
KeyValueStoreMaterializer<>(materialized)
-                .materialize();
+        final StoreBuilder<KeyValueStore<K, V>> storeBuilder = new 
KeyValueStoreMaterializer<>(materialized).materialize();
 
         final String source = newProcessorName(KStreamImpl.SOURCE_NAME);
         final String name = newProcessorName(KTableImpl.SOURCE_NAME);
@@ -88,7 +84,7 @@ public class InternalStreamsBuilder implements 
InternalNameProvider {
                                                  name);
 
         internalTopologyBuilder.addStateStore(storeBuilder, name);
-        
internalTopologyBuilder.connectSourceStoreAndTopic(storeBuilder.name(), topic);
+        internalTopologyBuilder.markSourceStoreAndTopic(storeBuilder, topic);
 
         return kTable;
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 02a1a06..188ff47 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -167,7 +167,7 @@ public abstract class AbstractTask implements Task {
         return sb.toString();
     }
 
-    protected Map<TopicPartition, Long> recordCollectorOffsets() {
+    protected Map<TopicPartition, Long> activeTaskCheckpointableOffsets() {
         return Collections.emptyMap();
     }
 
@@ -242,7 +242,7 @@ public abstract class AbstractTask implements Task {
         ProcessorStateException exception = null;
         log.trace("Closing state manager");
         try {
-            stateMgr.close(writeCheckpoint ? recordCollectorOffsets() : null);
+            stateMgr.close(writeCheckpoint ? activeTaskCheckpointableOffsets() 
: null);
         } catch (final ProcessorStateException e) {
             exception = e;
         } finally {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 7d09031..36a2edc 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
@@ -121,6 +122,9 @@ public class InternalTopologyBuilder {
 
     private Map<Integer, Set<String>> nodeGroups = null;
 
+    // TODO: this is only temporary for 2.0 and should be removed
+    public final Map<StoreBuilder, String> storeToSourceChangelogTopic = new 
HashMap<>();
+
     public interface StateStoreFactory {
         Set<String> users();
         boolean loggingEnabled();
@@ -498,8 +502,14 @@ public class InternalTopologyBuilder {
 
     public final void addStateStore(final StoreBuilder storeBuilder,
                                     final String... processorNames) {
+        addStateStore(storeBuilder, false, processorNames);
+    }
+
+    public final void addStateStore(final StoreBuilder storeBuilder,
+                                    final boolean allowOverride,
+                                    final String... processorNames) {
         Objects.requireNonNull(storeBuilder, "storeBuilder can't be null");
-        if (stateFactories.containsKey(storeBuilder.name())) {
+        if (!allowOverride && stateFactories.containsKey(storeBuilder.name())) 
{
             throw new TopologyException("StateStore " + storeBuilder.name() + 
" is already added.");
         }
 
@@ -566,16 +576,22 @@ public class InternalTopologyBuilder {
         }
     }
 
-    // TODO: this method is only used by DSL and we might want to refactor 
this part
     public final void connectSourceStoreAndTopic(final String sourceStoreName,
-                                                  final String topic) {
+                                                 final String topic) {
         if (storeToChangelogTopic.containsKey(sourceStoreName)) {
             throw new TopologyException("Source store " + sourceStoreName + " 
is already added.");
         }
         storeToChangelogTopic.put(sourceStoreName, topic);
     }
 
-    // TODO: this method is only used by DSL and we might want to refactor 
this part
+    public final void markSourceStoreAndTopic(final StoreBuilder storeBuilder,
+                                              final String topic) {
+        if (storeToSourceChangelogTopic.containsKey(storeBuilder)) {
+            throw new TopologyException("Source store " + storeBuilder.name() 
+ " is already used.");
+        }
+        storeToSourceChangelogTopic.put(storeBuilder, topic);
+    }
+
     public final void connectProcessors(final String... processorNames) {
         if (processorNames.length < 2) {
             throw new TopologyException("At least two processors need to 
participate in the connection.");
@@ -591,13 +607,11 @@ public class InternalTopologyBuilder {
         nodeGrouper.unite(processorNames[0], 
Arrays.copyOfRange(processorNames, 1, processorNames.length));
     }
 
-    // TODO: this method is only used by DSL and we might want to refactor 
this part
     public final void addInternalTopic(final String topicName) {
         Objects.requireNonNull(topicName, "topicName can't be null");
         internalTopicNames.add(topicName);
     }
 
-    // TODO: this method is only used by DSL and we might want to refactor 
this part
     public final void copartitionSources(final Collection<String> sourceNodes) 
{
         copartitionSourceGroups.add(Collections.unmodifiableSet(new 
HashSet<>(sourceNodes)));
     }
@@ -1059,6 +1073,24 @@ public class InternalTopologyBuilder {
         return Collections.unmodifiableMap(topicGroups);
     }
 
+    // Adjust the generated topology based on the configs.
+    // Not exposed as public API and should be removed post 2.0
+    public void adjust(final StreamsConfig config) {
+        final boolean enableOptimization20 = 
config.getString(StreamsConfig.TOPOLOGY_OPTIMIZATION).equals(StreamsConfig.OPTIMIZE);
+
+        if (enableOptimization20) {
+            for (final Map.Entry<StoreBuilder, String> entry : 
storeToSourceChangelogTopic.entrySet()) {
+                final StoreBuilder storeBuilder = entry.getKey();
+                final String topicName = entry.getValue();
+
+                // update store map to disable logging for this store
+                storeBuilder.withLoggingDisabled();
+                addStateStore(storeBuilder, true);
+                connectSourceStoreAndTopic(storeBuilder.name(), topicName);
+            }
+        }
+    }
+
     private void setRegexMatchedTopicsToSourceNodes() {
         if (subscriptionUpdates.hasUpdates()) {
             for (final Map.Entry<String, Pattern> stringPatternEntry : 
nodeToSourcePatterns.entrySet()) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index e7a23bd..054333b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -46,7 +46,7 @@ public class ProcessorStateManager extends 
AbstractStateManager {
     private final boolean isStandby;
     private final ChangelogReader changelogReader;
     private final Map<TopicPartition, Long> offsetLimits;
-    private final Map<TopicPartition, Long> restoredOffsets;
+    private final Map<TopicPartition, Long> standbyRestoredOffsets;
     private final Map<String, StateRestoreCallback> restoreCallbacks; // used 
for standby tasks, keyed by state topic name
     private final Map<String, String> storeToChangelogTopic;
     private final List<TopicPartition> changelogPartitions = new ArrayList<>();
@@ -79,7 +79,7 @@ public class ProcessorStateManager extends 
AbstractStateManager {
             partitionForTopic.put(source.topic(), source);
         }
         offsetLimits = new HashMap<>();
-        restoredOffsets = new HashMap<>();
+        standbyRestoredOffsets = new HashMap<>();
         this.isStandby = isStandby;
         restoreCallbacks = isStandby ? new HashMap<String, 
StateRestoreCallback>() : null;
         this.storeToChangelogTopic = storeToChangelogTopic;
@@ -212,7 +212,7 @@ public class ProcessorStateManager extends 
AbstractStateManager {
         }
 
         // record the restored offset for its change log partition
-        restoredOffsets.put(storePartition, lastOffset + 1);
+        standbyRestoredOffsets.put(storePartition, lastOffset + 1);
 
         return remainingRecords;
     }
@@ -293,8 +293,8 @@ public class ProcessorStateManager extends 
AbstractStateManager {
 
     // write the checkpoint
     @Override
-    public void checkpoint(final Map<TopicPartition, Long> ackedOffsets) {
-        checkpointableOffsets.putAll(changelogReader.restoredOffsets());
+    public void checkpoint(final Map<TopicPartition, Long> 
checkpointableOffsets) {
+        this.checkpointableOffsets.putAll(changelogReader.restoredOffsets());
         for (final StateStore store : stores.values()) {
             final String storeName = store.name();
             // only checkpoint the offset to the offsets file if
@@ -302,11 +302,11 @@ public class ProcessorStateManager extends 
AbstractStateManager {
             if (store.persistent() && 
storeToChangelogTopic.containsKey(storeName)) {
                 final String changelogTopic = 
storeToChangelogTopic.get(storeName);
                 final TopicPartition topicPartition = new 
TopicPartition(changelogTopic, getPartition(storeName));
-                if (ackedOffsets.containsKey(topicPartition)) {
+                if (checkpointableOffsets.containsKey(topicPartition)) {
                     // store the last offset + 1 (the log position after 
restoration)
-                    checkpointableOffsets.put(topicPartition, 
ackedOffsets.get(topicPartition) + 1);
-                } else if (restoredOffsets.containsKey(topicPartition)) {
-                    checkpointableOffsets.put(topicPartition, 
restoredOffsets.get(topicPartition));
+                    this.checkpointableOffsets.put(topicPartition, 
checkpointableOffsets.get(topicPartition) + 1);
+                } else if (standbyRestoredOffsets.containsKey(topicPartition)) 
{
+                    this.checkpointableOffsets.put(topicPartition, 
standbyRestoredOffsets.get(topicPartition));
                 }
             }
         }
@@ -315,9 +315,9 @@ public class ProcessorStateManager extends 
AbstractStateManager {
             checkpoint = new OffsetCheckpoint(new File(baseDir, 
CHECKPOINT_FILE_NAME));
         }
 
-        log.trace("Writing checkpoint: {}", checkpointableOffsets);
+        log.trace("Writing checkpoint: {}", this.checkpointableOffsets);
         try {
-            checkpoint.write(checkpointableOffsets);
+            checkpoint.write(this.checkpointableOffsets);
         } catch (final IOException e) {
             log.warn("Failed to write offset checkpoint file to {}: {}", 
checkpoint, e);
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index c33ade6..7623c66 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -91,7 +91,7 @@ public class StateDirectory {
      * @return directory for the {@link TaskId}
      * @throws ProcessorStateException if the task directory does not exists 
and could not be created
      */
-    File directoryForTask(final TaskId taskId) {
+    public File directoryForTask(final TaskId taskId) {
         final File taskDir = new File(stateDir, taskId.toString());
         if (!taskDir.exists() && !taskDir.mkdir()) {
             throw new ProcessorStateException(
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 4cea528..805b4c0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -380,7 +380,7 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator
         flushState();
 
         if (!eosEnabled) {
-            stateMgr.checkpoint(recordCollectorOffsets());
+            stateMgr.checkpoint(activeTaskCheckpointableOffsets());
         }
 
         commitOffsets(startNewTransaction);
@@ -391,8 +391,13 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator
     }
 
     @Override
-    protected Map<TopicPartition, Long> recordCollectorOffsets() {
-        return recordCollector.offsets();
+    protected Map<TopicPartition, Long> activeTaskCheckpointableOffsets() {
+        final Map<TopicPartition, Long> checkpointableOffsets = 
recordCollector.offsets();
+        for (final Map.Entry<TopicPartition, Long> entry : 
consumedOffsets.entrySet()) {
+            checkpointableOffsets.putIfAbsent(entry.getKey(), 
entry.getValue());
+        }
+
+        return checkpointableOffsets;
     }
 
     @Override
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java 
b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 37101de..3b8c9bd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -273,11 +273,17 @@ public class StreamsBuilderTest {
     }
 
     @Test
-    public void shouldReuseSourceTopicAsChangelogs() {
+    public void shouldReuseSourceTopicAsChangelogsWithOptimization20() {
         final String topic = "topic";
         builder.table(topic, Materialized.<Long, String, KeyValueStore<Bytes, 
byte[]>>as("store"));
+        final Topology topology = builder.build();
+        final Properties props = StreamsTestUtils.minimalStreamsConfig();
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
 
-        final InternalTopologyBuilder internalTopologyBuilder = 
TopologyWrapper.getInternalTopologyBuilder(builder.build());
+        final InternalTopologyBuilder internalTopologyBuilder = 
TopologyWrapper.getInternalTopologyBuilder(topology);
+        internalTopologyBuilder.adjust(new StreamsConfig(props));
+
+        assertThat(internalTopologyBuilder.build().storeToChangelogTopic(), 
equalTo(Collections.singletonMap("store", "topic")));
 
         assertThat(internalTopologyBuilder.getStateStores().keySet(), 
equalTo(Collections.singleton("store")));
 
@@ -285,6 +291,23 @@ public class StreamsBuilderTest {
 
         
assertThat(internalTopologyBuilder.topicGroups().get(0).stateChangelogTopics.isEmpty(),
 equalTo(true));
     }
+
+    @Test
+    public void shouldNotReuseSourceTopicAsChangelogsByDefault() {
+        final String topic = "topic";
+        builder.table(topic, Materialized.<Long, String, KeyValueStore<Bytes, 
byte[]>>as("store"));
+
+        final InternalTopologyBuilder internalTopologyBuilder = 
TopologyWrapper.getInternalTopologyBuilder(builder.build());
+        internalTopologyBuilder.setApplicationId("appId");
+
+        assertThat(internalTopologyBuilder.build().storeToChangelogTopic(), 
equalTo(Collections.singletonMap("store", "appId-store-changelog")));
+
+        assertThat(internalTopologyBuilder.getStateStores().keySet(), 
equalTo(Collections.singleton("store")));
+
+        
assertThat(internalTopologyBuilder.getStateStores().get("store").loggingEnabled(),
 equalTo(true));
+
+        
assertThat(internalTopologyBuilder.topicGroups().get(0).stateChangelogTopics.keySet(),
 equalTo(Collections.singleton("appId-store-changelog")));
+    }
     
     @Test(expected = TopologyException.class)
     public void shouldThrowExceptionWhenNoTopicPresent() {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
index f6d36f7..dbf85fa 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
@@ -19,7 +19,6 @@ package org.apache.kafka.streams.integration;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -28,6 +27,7 @@ import 
org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
@@ -44,11 +44,14 @@ import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StateRestoreListener;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.StateDirectory;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
+import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -57,10 +60,10 @@ import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import java.io.File;
 import java.util.Arrays;
-import java.util.HashMap;
+import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
@@ -76,6 +79,8 @@ import static org.junit.Assert.assertTrue;
 public class RestoreIntegrationTest {
     private static final int NUM_BROKERS = 1;
 
+    private static final String APPID = "restore-test";
+
     @ClassRule
     public static final EmbeddedKafkaCluster CLUSTER =
             new EmbeddedKafkaCluster(NUM_BROKERS);
@@ -83,24 +88,24 @@ public class RestoreIntegrationTest {
     private static final String INPUT_STREAM_2 = "input-stream-2";
     private final int numberOfKeys = 10000;
     private KafkaStreams kafkaStreams;
-    private String applicationId = "restore-test";
-
 
     @BeforeClass
     public static void createTopics() throws InterruptedException {
         CLUSTER.createTopic(INPUT_STREAM, 2, 1);
         CLUSTER.createTopic(INPUT_STREAM_2, 2, 1);
+        CLUSTER.createTopic(APPID + "-store-changelog", 2, 1);
     }
 
     private Properties props(final String applicationId) {
         Properties streamsConfiguration = new Properties();
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, 
applicationId);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
-        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+        
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory(applicationId).getPath());
         streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.Integer().getClass());
         
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.Integer().getClass());
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
1000);
+        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
         return streamsConfiguration;
     }
 
@@ -112,24 +117,106 @@ public class RestoreIntegrationTest {
     }
 
     @Test
-    public void shouldRestoreState() throws ExecutionException, 
InterruptedException {
+    public void shouldRestoreStateFromSourceTopic() throws Exception {
         final AtomicInteger numReceived = new AtomicInteger(0);
         final StreamsBuilder builder = new StreamsBuilder();
 
-        createStateForRestoration();
+        final Properties props = props(APPID);
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
+
+        // restoring from 1000 to 4000 (committed), and then process from 4000 
to 5000 on each of the two partitions
+        final int offsetLimitDelta = 1000;
+        final int offsetCheckpointed = 1000;
+        createStateForRestoration(INPUT_STREAM);
+        setCommittedOffset(INPUT_STREAM, offsetLimitDelta);
+
+        final StateDirectory stateDirectory = new StateDirectory(new 
StreamsConfig(props), new MockTime());
+        new OffsetCheckpoint(new File(stateDirectory.directoryForTask(new 
TaskId(0, 0)), ".checkpoint"))
+                .write(Collections.singletonMap(new 
TopicPartition(INPUT_STREAM, 0), (long) offsetCheckpointed));
+        new OffsetCheckpoint(new File(stateDirectory.directoryForTask(new 
TaskId(0, 1)), ".checkpoint"))
+                .write(Collections.singletonMap(new 
TopicPartition(INPUT_STREAM, 1), (long) offsetCheckpointed));
+
+        final CountDownLatch startupLatch = new CountDownLatch(1);
+        final CountDownLatch shutdownLatch = new CountDownLatch(1);
 
         builder.table(INPUT_STREAM, Consumed.with(Serdes.Integer(), 
Serdes.Integer()))
                 .toStream()
                 .foreach(new ForeachAction<Integer, Integer>() {
                     @Override
                     public void apply(final Integer key, final Integer value) {
-                        numReceived.incrementAndGet();
+                        if (numReceived.incrementAndGet() == 2 * 
offsetLimitDelta)
+                            shutdownLatch.countDown();
                     }
                 });
 
+        kafkaStreams = new KafkaStreams(builder.build(), props);
+        kafkaStreams.setStateListener(new KafkaStreams.StateListener() {
+            @Override
+            public void onChange(final KafkaStreams.State newState, final 
KafkaStreams.State oldState) {
+                if (newState == KafkaStreams.State.RUNNING && oldState == 
KafkaStreams.State.REBALANCING) {
+                    startupLatch.countDown();
+                }
+            }
+        });
+
+        final AtomicLong restored = new AtomicLong(0);
+        kafkaStreams.setGlobalStateRestoreListener(new StateRestoreListener() {
+            @Override
+            public void onRestoreStart(final TopicPartition topicPartition, 
final String storeName, final long startingOffset, final long endingOffset) {
+
+            }
+
+            @Override
+            public void onBatchRestored(final TopicPartition topicPartition, 
final String storeName, final long batchEndOffset, final long numRestored) {
+
+            }
+
+            @Override
+            public void onRestoreEnd(final TopicPartition topicPartition, 
final String storeName, final long totalRestored) {
+                restored.addAndGet(totalRestored);
+            }
+        });
+        kafkaStreams.start();
+
+        assertTrue(startupLatch.await(30, TimeUnit.SECONDS));
+        assertThat(restored.get(), equalTo((long) numberOfKeys - 
offsetLimitDelta * 2 - offsetCheckpointed * 2));
+
+        assertTrue(shutdownLatch.await(30, TimeUnit.SECONDS));
+        assertThat(numReceived.get(), equalTo(offsetLimitDelta * 2));
+    }
+
+    @Test
+    public void shouldRestoreStateFromChangelogTopic() throws Exception {
+        final AtomicInteger numReceived = new AtomicInteger(0);
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final Properties props = props(APPID);
+
+        // restoring from 1000 to 5000, and then process from 5000 to 10000 on 
each of the two partitions
+        final int offsetCheckpointed = 1000;
+        createStateForRestoration(APPID + "-store-changelog");
+        createStateForRestoration(INPUT_STREAM);
+
+        final StateDirectory stateDirectory = new StateDirectory(new 
StreamsConfig(props), new MockTime());
+        new OffsetCheckpoint(new File(stateDirectory.directoryForTask(new 
TaskId(0, 0)), ".checkpoint"))
+                .write(Collections.singletonMap(new TopicPartition(APPID + 
"-store-changelog", 0), (long) offsetCheckpointed));
+        new OffsetCheckpoint(new File(stateDirectory.directoryForTask(new 
TaskId(0, 1)), ".checkpoint"))
+                .write(Collections.singletonMap(new TopicPartition(APPID + 
"-store-changelog", 1), (long) offsetCheckpointed));
 
         final CountDownLatch startupLatch = new CountDownLatch(1);
-        kafkaStreams = new KafkaStreams(builder.build(), props(applicationId));
+        final CountDownLatch shutdownLatch = new CountDownLatch(1);
+
+        builder.table(INPUT_STREAM, Consumed.with(Serdes.Integer(), 
Serdes.Integer()), Materialized.as("store"))
+                .toStream()
+                .foreach(new ForeachAction<Integer, Integer>() {
+                    @Override
+                    public void apply(final Integer key, final Integer value) {
+                        if (numReceived.incrementAndGet() == numberOfKeys)
+                            shutdownLatch.countDown();
+                    }
+                });
+
+        kafkaStreams = new KafkaStreams(builder.build(), props);
         kafkaStreams.setStateListener(new KafkaStreams.StateListener() {
             @Override
             public void onChange(final KafkaStreams.State newState, final 
KafkaStreams.State oldState) {
@@ -159,8 +246,10 @@ public class RestoreIntegrationTest {
         kafkaStreams.start();
 
         assertTrue(startupLatch.await(30, TimeUnit.SECONDS));
-        assertThat(restored.get(), equalTo((long) numberOfKeys));
-        assertThat(numReceived.get(), equalTo(0));
+        assertThat(restored.get(), equalTo((long) numberOfKeys - 2 * 
offsetCheckpointed));
+
+        assertTrue(shutdownLatch.await(30, TimeUnit.SECONDS));
+        assertThat(numReceived.get(), equalTo(numberOfKeys));
     }
 
 
@@ -178,7 +267,7 @@ public class RestoreIntegrationTest {
                 }, Materialized.<Integer, Integer, KeyValueStore<Bytes, 
byte[]>>as("reduce-store").withLoggingDisabled());
 
         final CountDownLatch startupLatch = new CountDownLatch(1);
-        kafkaStreams = new KafkaStreams(builder.build(), props(applicationId));
+        kafkaStreams = new KafkaStreams(builder.build(), props(APPID));
         kafkaStreams.setStateListener(new KafkaStreams.StateListener() {
             @Override
             public void onChange(final KafkaStreams.State newState, final 
KafkaStreams.State oldState) {
@@ -228,7 +317,7 @@ public class RestoreIntegrationTest {
 
         final Topology topology = streamsBuilder.build();
 
-        kafkaStreams = new KafkaStreams(topology, props(applicationId + 
"-logging-disabled"));
+        kafkaStreams = new KafkaStreams(topology, props(APPID + 
"-logging-disabled"));
 
         final CountDownLatch latch = new CountDownLatch(1);
         kafkaStreams.setStateListener(new KafkaStreams.StateListener() {
@@ -279,8 +368,7 @@ public class RestoreIntegrationTest {
         }
     }
     
-    private void createStateForRestoration()
-            throws ExecutionException, InterruptedException {
+    private void createStateForRestoration(final String changelogTopic) {
         final Properties producerConfig = new Properties();
         producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
 
@@ -288,30 +376,33 @@ public class RestoreIntegrationTest {
                      new KafkaProducer<>(producerConfig, new 
IntegerSerializer(), new IntegerSerializer())) {
 
             for (int i = 0; i < numberOfKeys; i++) {
-                producer.send(new ProducerRecord<>(INPUT_STREAM, i, i));
+                producer.send(new ProducerRecord<>(changelogTopic, i, i));
             }
         }
+    }
 
+    private void setCommittedOffset(final String topic, final int limitDelta) {
         final Properties consumerConfig = new Properties();
         consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
-        consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, applicationId);
+        consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, APPID);
+        consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, "commit-consumer");
         consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
IntegerDeserializer.class);
         consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
IntegerDeserializer.class);
 
         final Consumer consumer = new KafkaConsumer(consumerConfig);
-        final List<TopicPartition> partitions = Arrays.asList(new 
TopicPartition(INPUT_STREAM, 0),
-                                                              new 
TopicPartition(INPUT_STREAM, 1));
+        final List<TopicPartition> partitions = Arrays.asList(
+            new TopicPartition(topic, 0),
+            new TopicPartition(topic, 1));
 
         consumer.assign(partitions);
         consumer.seekToEnd(partitions);
 
-        final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
         for (TopicPartition partition : partitions) {
             final long position = consumer.position(partition);
-            offsets.put(partition, new OffsetAndMetadata(position + 1));
+            consumer.seek(partition, position - limitDelta);
         }
 
-        consumer.commitSync(offsets);
+        consumer.commitSync();
         consumer.close();
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
index b5e6fcb..5fab666 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
@@ -55,8 +55,8 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
         appID = "table-table-join-integration-test";
 
         builder = new StreamsBuilder();
-        leftTable = builder.table(INPUT_TOPIC_LEFT);
-        rightTable = builder.table(INPUT_TOPIC_RIGHT);
+        leftTable = builder.table(INPUT_TOPIC_LEFT, Materialized.<Long, 
String, KeyValueStore<Bytes, byte[]>>as("left").withLoggingDisabled());
+        rightTable = builder.table(INPUT_TOPIC_RIGHT, Materialized.<Long, 
String, KeyValueStore<Bytes, byte[]>>as("right").withLoggingDisabled());
     }
 
     final private String expectedFinalJoinResult = "D-d";
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
index 63432ff..ef3fcd6 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
@@ -137,7 +137,7 @@ public class InternalStreamsBuilderTest {
         assertEquals(storeName, topology.stateStores().get(0).name());
 
         assertEquals(1, topology.storeToChangelogTopic().size());
-        assertEquals("topic2", 
topology.storeToChangelogTopic().get(storeName));
+        assertEquals("app-id-prefix-STATE-STORE-0000000000-changelog", 
topology.storeToChangelogTopic().get(storeName));
         assertNull(table1.queryableStoreName());
     }
     
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 1cc9c06..3412c62 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -21,7 +21,6 @@ import 
org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.InvalidOffsetException;
 import org.apache.kafka.clients.consumer.MockConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.Cluster;
@@ -59,6 +58,7 @@ import org.apache.kafka.streams.processor.TaskMetadata;
 import org.apache.kafka.streams.processor.ThreadMetadata;
 import 
org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.apache.kafka.test.MockClientSupplier;
 import org.apache.kafka.test.MockStateRestoreListener;
 import org.apache.kafka.test.MockTimestampExtractor;
@@ -69,6 +69,8 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -84,6 +86,7 @@ import static java.util.Collections.singletonList;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static 
org.apache.kafka.streams.processor.internals.AbstractStateManager.CHECKPOINT_FILE_NAME;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -821,12 +824,13 @@ public class StreamThreadTest {
 
     @SuppressWarnings("unchecked")
     @Test
-    public void shouldUpdateStandbyTask() {
+    public void shouldUpdateStandbyTask() throws IOException {
         final String storeName1 = "count-one";
         final String storeName2 = "table-two";
-        final String changelogName = applicationId + "-" + storeName1 + 
"-changelog";
-        final TopicPartition partition1 = new TopicPartition(changelogName, 1);
-        final TopicPartition partition2 = t2p1;
+        final String changelogName1 = applicationId + "-" + storeName1 + 
"-changelog";
+        final String changelogName2 = applicationId + "-" + storeName2 + 
"-changelog";
+        final TopicPartition partition1 = new TopicPartition(changelogName1, 
1);
+        final TopicPartition partition2 = new TopicPartition(changelogName2, 
1);
         internalStreamsBuilder.stream(Collections.singleton(topic1), consumed)
             .groupByKey().count(Materialized.<Object, Long, 
KeyValueStore<Bytes, byte[]>>as(storeName1));
         final MaterializedInternal materialized = new 
MaterializedInternal(Materialized.as(storeName2));
@@ -835,10 +839,10 @@ public class StreamThreadTest {
 
         final StreamThread thread = createStreamThread(clientId, config, 
false);
         final MockConsumer<byte[], byte[]> restoreConsumer = 
clientSupplier.restoreConsumer;
-        restoreConsumer.updatePartitions(changelogName,
+        restoreConsumer.updatePartitions(changelogName1,
             singletonList(
                 new PartitionInfo(
-                    changelogName,
+                    changelogName1,
                     1,
                     null,
                     new Node[0],
@@ -852,13 +856,13 @@ public class StreamThreadTest {
         
restoreConsumer.updateBeginningOffsets(Collections.singletonMap(partition1, 
0L));
         restoreConsumer.updateEndOffsets(Collections.singletonMap(partition2, 
10L));
         
restoreConsumer.updateBeginningOffsets(Collections.singletonMap(partition2, 
0L));
-        // let the store1 be restored from 0 to 10; store2 be restored from 0 
to (committed offset) 5
-        clientSupplier.consumer.assign(Utils.mkSet(partition2));
-        
clientSupplier.consumer.commitSync(Collections.singletonMap(partition2, new 
OffsetAndMetadata(5L, "")));
+        // let the store1 be restored from 0 to 10; store2 be restored from 5 
(checkpointed) to 10
+        OffsetCheckpoint checkpoint = new OffsetCheckpoint(new 
File(stateDirectory.directoryForTask(task3), CHECKPOINT_FILE_NAME));
+        checkpoint.write(Collections.singletonMap(partition2, 5L));
 
         for (long i = 0L; i < 10L; i++) {
-            restoreConsumer.addRecord(new ConsumerRecord<>(changelogName, 1, 
i, ("K" + i).getBytes(), ("V" + i).getBytes()));
-            restoreConsumer.addRecord(new ConsumerRecord<>(topic2, 1, i, ("K" 
+ i).getBytes(), ("V" + i).getBytes()));
+            restoreConsumer.addRecord(new ConsumerRecord<>(changelogName1, 1, 
i, ("K" + i).getBytes(), ("V" + i).getBytes()));
+            restoreConsumer.addRecord(new ConsumerRecord<>(changelogName2, 1, 
i, ("K" + i).getBytes(), ("V" + i).getBytes()));
         }
 
         thread.setState(StreamThread.State.RUNNING);
@@ -884,9 +888,7 @@ public class StreamThreadTest {
 
         assertEquals(10L, store1.approximateNumEntries());
         assertEquals(5L, store2.approximateNumEntries());
-        assertEquals(Collections.singleton(partition2), 
restoreConsumer.paused());
-        assertEquals(1, thread.standbyRecords().size());
-        assertEquals(5, thread.standbyRecords().get(partition2).size());
+        assertEquals(0, thread.standbyRecords().size());
     }
 
     @Test
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index a32d193..4327e8f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -799,6 +799,7 @@ public class StreamsPartitionAssignorTest {
         final Map<String, Integer> expectedCreatedInternalTopics = new 
HashMap<>();
         expectedCreatedInternalTopics.put(applicationId + 
"-KTABLE-AGGREGATE-STATE-STORE-0000000006-repartition", 4);
         expectedCreatedInternalTopics.put(applicationId + 
"-KTABLE-AGGREGATE-STATE-STORE-0000000006-changelog", 4);
+        expectedCreatedInternalTopics.put(applicationId + 
"-topic3-STATE-STORE-0000000002-changelog", 4);
         expectedCreatedInternalTopics.put(applicationId + 
"-KSTREAM-MAP-0000000001-repartition", 4);
 
         // check if all internal topics were created as expected

-- 
To stop receiving notification emails like this one, please contact
guozh...@apache.org.

Reply via email to