[
https://issues.apache.org/jira/browse/KAFKA-7021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16508927#comment-16508927
]
ASF GitHub Bot commented on KAFKA-7021:
---------------------------------------
guozhangwang closed pull request #5163: KAFKA-7021: Reuse source based on config
URL: https://github.com/apache/kafka/pull/5163
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index d6002ff016b..6a707ff986d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -599,7 +599,7 @@ public KafkaStreams(final Topology topology,
@Deprecated
public KafkaStreams(final Topology topology,
final StreamsConfig config) {
- this(topology.internalTopologyBuilder, config, new
DefaultKafkaClientSupplier());
+ this(topology, config, new DefaultKafkaClientSupplier());
}
/**
@@ -635,6 +635,10 @@ private KafkaStreams(final InternalTopologyBuilder
internalTopologyBuilder,
this.config = config;
this.time = time;
+ // adjust the topology if optimization is turned on.
+ // TODO: to be removed post 2.0
+ internalTopologyBuilder.adjust(config);
+
// The application ID is a required config and hence should always
have value
processId = UUID.randomUUID();
final String userClientId =
config.getString(StreamsConfig.CLIENT_ID_CONFIG);
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index 517104da323..ae6d44c449e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -302,11 +302,10 @@
Objects.requireNonNull(materialized, "materialized can't be null");
final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>>
materializedInternal = new MaterializedInternal<>(materialized);
materializedInternal.generateStoreNameIfNeeded(internalStreamsBuilder,
topic + "-");
+ final ConsumedInternal<K, V> consumedInternal =
+ new
ConsumedInternal<>(Consumed.with(materializedInternal.keySerde(),
materializedInternal.valueSerde()));
- return internalStreamsBuilder.table(topic,
- new
ConsumedInternal<>(Consumed.with(materializedInternal.keySerde(),
-
materializedInternal.valueSerde())),
- materializedInternal);
+ return internalStreamsBuilder.table(topic, consumedInternal,
materializedInternal);
}
/**
diff --git a/streams/src/main/java/org/apache/kafka/streams/Topology.java
b/streams/src/main/java/org/apache/kafka/streams/Topology.java
index 22f6ea8362b..753185c2164 100644
--- a/streams/src/main/java/org/apache/kafka/streams/Topology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/Topology.java
@@ -776,5 +776,4 @@ public synchronized Topology
connectProcessorAndStateStores(final String process
public synchronized TopologyDescription describe() {
return internalTopologyBuilder.describe();
}
-
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 0a19b4eb0c0..c7bf2fac8f7 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -72,11 +72,7 @@ public InternalStreamsBuilder(final InternalTopologyBuilder
internalTopologyBuil
public <K, V> KTable<K, V> table(final String topic,
final ConsumedInternal<K, V> consumed,
final MaterializedInternal<K, V,
KeyValueStore<Bytes, byte[]>> materialized) {
- // explicitly disable logging for source table materialized stores
- materialized.withLoggingDisabled();
-
- final StoreBuilder<KeyValueStore<K, V>> storeBuilder = new
KeyValueStoreMaterializer<>(materialized)
- .materialize();
+ final StoreBuilder<KeyValueStore<K, V>> storeBuilder = new
KeyValueStoreMaterializer<>(materialized).materialize();
final String source = newProcessorName(KStreamImpl.SOURCE_NAME);
final String name = newProcessorName(KTableImpl.SOURCE_NAME);
@@ -88,7 +84,7 @@ public InternalStreamsBuilder(final InternalTopologyBuilder
internalTopologyBuil
name);
internalTopologyBuilder.addStateStore(storeBuilder, name);
-
internalTopologyBuilder.connectSourceStoreAndTopic(storeBuilder.name(), topic);
+ internalTopologyBuilder.markSourceStoreAndTopic(storeBuilder, topic);
return kTable;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 02a1a066ab8..188ff473038 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -167,7 +167,7 @@ public String toString(final String indent) {
return sb.toString();
}
- protected Map<TopicPartition, Long> recordCollectorOffsets() {
+ protected Map<TopicPartition, Long> activeTaskCheckpointableOffsets() {
return Collections.emptyMap();
}
@@ -242,7 +242,7 @@ void closeStateManager(final boolean writeCheckpoint)
throws ProcessorStateExcep
ProcessorStateException exception = null;
log.trace("Closing state manager");
try {
- stateMgr.close(writeCheckpoint ? recordCollectorOffsets() : null);
+ stateMgr.close(writeCheckpoint ? activeTaskCheckpointableOffsets()
: null);
} catch (final ProcessorStateException e) {
exception = e;
} finally {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 7d09031d713..36a2edc6766 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -19,6 +19,7 @@
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.processor.ProcessorSupplier;
@@ -121,6 +122,9 @@
private Map<Integer, Set<String>> nodeGroups = null;
+ // TODO: this is only temporary for 2.0 and should be removed
+ public final Map<StoreBuilder, String> storeToSourceChangelogTopic = new
HashMap<>();
+
public interface StateStoreFactory {
Set<String> users();
boolean loggingEnabled();
@@ -498,8 +502,14 @@ public final void addProcessor(final String name,
public final void addStateStore(final StoreBuilder storeBuilder,
final String... processorNames) {
+ addStateStore(storeBuilder, false, processorNames);
+ }
+
+ public final void addStateStore(final StoreBuilder storeBuilder,
+ final boolean allowOverride,
+ final String... processorNames) {
Objects.requireNonNull(storeBuilder, "storeBuilder can't be null");
- if (stateFactories.containsKey(storeBuilder.name())) {
+ if (!allowOverride && stateFactories.containsKey(storeBuilder.name()))
{
throw new TopologyException("StateStore " + storeBuilder.name() +
" is already added.");
}
@@ -566,16 +576,22 @@ public final void connectProcessorAndStateStores(final
String processorName,
}
}
- // TODO: this method is only used by DSL and we might want to refactor
this part
public final void connectSourceStoreAndTopic(final String sourceStoreName,
- final String topic) {
+ final String topic) {
if (storeToChangelogTopic.containsKey(sourceStoreName)) {
throw new TopologyException("Source store " + sourceStoreName + "
is already added.");
}
storeToChangelogTopic.put(sourceStoreName, topic);
}
- // TODO: this method is only used by DSL and we might want to refactor
this part
+ public final void markSourceStoreAndTopic(final StoreBuilder storeBuilder,
+ final String topic) {
+ if (storeToSourceChangelogTopic.containsKey(storeBuilder)) {
+ throw new TopologyException("Source store " + storeBuilder.name()
+ " is already used.");
+ }
+ storeToSourceChangelogTopic.put(storeBuilder, topic);
+ }
+
public final void connectProcessors(final String... processorNames) {
if (processorNames.length < 2) {
throw new TopologyException("At least two processors need to
participate in the connection.");
@@ -591,13 +607,11 @@ public final void connectProcessors(final String...
processorNames) {
nodeGrouper.unite(processorNames[0],
Arrays.copyOfRange(processorNames, 1, processorNames.length));
}
- // TODO: this method is only used by DSL and we might want to refactor
this part
public final void addInternalTopic(final String topicName) {
Objects.requireNonNull(topicName, "topicName can't be null");
internalTopicNames.add(topicName);
}
- // TODO: this method is only used by DSL and we might want to refactor
this part
public final void copartitionSources(final Collection<String> sourceNodes)
{
copartitionSourceGroups.add(Collections.unmodifiableSet(new
HashSet<>(sourceNodes)));
}
@@ -1059,6 +1073,24 @@ private void buildProcessorNode(final Map<String,
ProcessorNode> processorMap,
return Collections.unmodifiableMap(topicGroups);
}
+ // Adjust the generated topology based on the configs.
+ // Not exposed as public API and should be removed post 2.0
+ public void adjust(final StreamsConfig config) {
+ final boolean enableOptimization20 =
config.getString(StreamsConfig.TOPOLOGY_OPTIMIZATION).equals(StreamsConfig.OPTIMIZE);
+
+ if (enableOptimization20) {
+ for (final Map.Entry<StoreBuilder, String> entry :
storeToSourceChangelogTopic.entrySet()) {
+ final StoreBuilder storeBuilder = entry.getKey();
+ final String topicName = entry.getValue();
+
+ // update store map to disable logging for this store
+ storeBuilder.withLoggingDisabled();
+ addStateStore(storeBuilder, true);
+ connectSourceStoreAndTopic(storeBuilder.name(), topicName);
+ }
+ }
+ }
+
private void setRegexMatchedTopicsToSourceNodes() {
if (subscriptionUpdates.hasUpdates()) {
for (final Map.Entry<String, Pattern> stringPatternEntry :
nodeToSourcePatterns.entrySet()) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index e7a23bd4b5f..054333b7a8f 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -46,7 +46,7 @@
private final boolean isStandby;
private final ChangelogReader changelogReader;
private final Map<TopicPartition, Long> offsetLimits;
- private final Map<TopicPartition, Long> restoredOffsets;
+ private final Map<TopicPartition, Long> standbyRestoredOffsets;
private final Map<String, StateRestoreCallback> restoreCallbacks; // used
for standby tasks, keyed by state topic name
private final Map<String, String> storeToChangelogTopic;
private final List<TopicPartition> changelogPartitions = new ArrayList<>();
@@ -79,7 +79,7 @@ public ProcessorStateManager(final TaskId taskId,
partitionForTopic.put(source.topic(), source);
}
offsetLimits = new HashMap<>();
- restoredOffsets = new HashMap<>();
+ standbyRestoredOffsets = new HashMap<>();
this.isStandby = isStandby;
restoreCallbacks = isStandby ? new HashMap<String,
StateRestoreCallback>() : null;
this.storeToChangelogTopic = storeToChangelogTopic;
@@ -212,7 +212,7 @@ public void reinitializeStateStoresForPartitions(final
Collection<TopicPartition
}
// record the restored offset for its change log partition
- restoredOffsets.put(storePartition, lastOffset + 1);
+ standbyRestoredOffsets.put(storePartition, lastOffset + 1);
return remainingRecords;
}
@@ -293,8 +293,8 @@ public void close(final Map<TopicPartition, Long>
ackedOffsets) throws Processor
// write the checkpoint
@Override
- public void checkpoint(final Map<TopicPartition, Long> ackedOffsets) {
- checkpointableOffsets.putAll(changelogReader.restoredOffsets());
+ public void checkpoint(final Map<TopicPartition, Long>
checkpointableOffsets) {
+ this.checkpointableOffsets.putAll(changelogReader.restoredOffsets());
for (final StateStore store : stores.values()) {
final String storeName = store.name();
// only checkpoint the offset to the offsets file if
@@ -302,11 +302,11 @@ public void checkpoint(final Map<TopicPartition, Long>
ackedOffsets) {
if (store.persistent() &&
storeToChangelogTopic.containsKey(storeName)) {
final String changelogTopic =
storeToChangelogTopic.get(storeName);
final TopicPartition topicPartition = new
TopicPartition(changelogTopic, getPartition(storeName));
- if (ackedOffsets.containsKey(topicPartition)) {
+ if (checkpointableOffsets.containsKey(topicPartition)) {
// store the last offset + 1 (the log position after
restoration)
- checkpointableOffsets.put(topicPartition,
ackedOffsets.get(topicPartition) + 1);
- } else if (restoredOffsets.containsKey(topicPartition)) {
- checkpointableOffsets.put(topicPartition,
restoredOffsets.get(topicPartition));
+ this.checkpointableOffsets.put(topicPartition,
checkpointableOffsets.get(topicPartition) + 1);
+ } else if (standbyRestoredOffsets.containsKey(topicPartition))
{
+ this.checkpointableOffsets.put(topicPartition,
standbyRestoredOffsets.get(topicPartition));
}
}
}
@@ -315,9 +315,9 @@ public void checkpoint(final Map<TopicPartition, Long>
ackedOffsets) {
checkpoint = new OffsetCheckpoint(new File(baseDir,
CHECKPOINT_FILE_NAME));
}
- log.trace("Writing checkpoint: {}", checkpointableOffsets);
+ log.trace("Writing checkpoint: {}", this.checkpointableOffsets);
try {
- checkpoint.write(checkpointableOffsets);
+ checkpoint.write(this.checkpointableOffsets);
} catch (final IOException e) {
log.warn("Failed to write offset checkpoint file to {}: {}",
checkpoint, e);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index c33ade6f36e..7623c66cd3b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -91,7 +91,7 @@ public StateDirectory(final StreamsConfig config,
* @return directory for the {@link TaskId}
* @throws ProcessorStateException if the task directory does not exists
and could not be created
*/
- File directoryForTask(final TaskId taskId) {
+ public File directoryForTask(final TaskId taskId) {
final File taskDir = new File(stateDir, taskId.toString());
if (!taskDir.exists() && !taskDir.mkdir()) {
throw new ProcessorStateException(
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index e2be3e29172..9493493973e 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -380,7 +380,7 @@ void commit(final boolean startNewTransaction) {
flushState();
if (!eosEnabled) {
- stateMgr.checkpoint(recordCollectorOffsets());
+ stateMgr.checkpoint(activeTaskCheckpointableOffsets());
}
commitOffsets(startNewTransaction);
@@ -391,8 +391,13 @@ void commit(final boolean startNewTransaction) {
}
@Override
- protected Map<TopicPartition, Long> recordCollectorOffsets() {
- return recordCollector.offsets();
+ protected Map<TopicPartition, Long> activeTaskCheckpointableOffsets() {
+ final Map<TopicPartition, Long> checkpointableOffsets =
recordCollector.offsets();
+ for (final Map.Entry<TopicPartition, Long> entry :
consumedOffsets.entrySet()) {
+ checkpointableOffsets.putIfAbsent(entry.getKey(),
entry.getValue());
+ }
+
+ return checkpointableOffsets;
}
@Override
diff --git
a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 37101de344a..3b8c9bd47d9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -273,11 +273,17 @@ public void shouldUseDefaultNodeAndStoreNames() {
}
@Test
- public void shouldReuseSourceTopicAsChangelogs() {
+ public void shouldReuseSourceTopicAsChangelogsWithOptimization20() {
final String topic = "topic";
builder.table(topic, Materialized.<Long, String, KeyValueStore<Bytes,
byte[]>>as("store"));
+ final Topology topology = builder.build();
+ final Properties props = StreamsTestUtils.minimalStreamsConfig();
+ props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
- final InternalTopologyBuilder internalTopologyBuilder =
TopologyWrapper.getInternalTopologyBuilder(builder.build());
+ final InternalTopologyBuilder internalTopologyBuilder =
TopologyWrapper.getInternalTopologyBuilder(topology);
+ internalTopologyBuilder.adjust(new StreamsConfig(props));
+
+ assertThat(internalTopologyBuilder.build().storeToChangelogTopic(),
equalTo(Collections.singletonMap("store", "topic")));
assertThat(internalTopologyBuilder.getStateStores().keySet(),
equalTo(Collections.singleton("store")));
@@ -285,6 +291,23 @@ public void shouldReuseSourceTopicAsChangelogs() {
assertThat(internalTopologyBuilder.topicGroups().get(0).stateChangelogTopics.isEmpty(),
equalTo(true));
}
+
+ @Test
+ public void shouldNotReuseSourceTopicAsChangelogsByDefault() {
+ final String topic = "topic";
+ builder.table(topic, Materialized.<Long, String, KeyValueStore<Bytes,
byte[]>>as("store"));
+
+ final InternalTopologyBuilder internalTopologyBuilder =
TopologyWrapper.getInternalTopologyBuilder(builder.build());
+ internalTopologyBuilder.setApplicationId("appId");
+
+ assertThat(internalTopologyBuilder.build().storeToChangelogTopic(),
equalTo(Collections.singletonMap("store", "appId-store-changelog")));
+
+ assertThat(internalTopologyBuilder.getStateStores().keySet(),
equalTo(Collections.singleton("store")));
+
+
assertThat(internalTopologyBuilder.getStateStores().get("store").loggingEnabled(),
equalTo(true));
+
+
assertThat(internalTopologyBuilder.topicGroups().get(0).stateChangelogTopics.keySet(),
equalTo(Collections.singleton("appId-store-changelog")));
+ }
@Test(expected = TopologyException.class)
public void shouldThrowExceptionWhenNoTopicPresent() {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
index f6d36f70848..dbf85fa46cd 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
@@ -19,7 +19,6 @@
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -28,6 +27,7 @@
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
@@ -44,11 +44,14 @@
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateRestoreListener;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
+import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
@@ -57,10 +60,10 @@
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import java.io.File;
import java.util.Arrays;
-import java.util.HashMap;
+import java.util.Collections;
import java.util.List;
-import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
@@ -76,6 +79,8 @@
public class RestoreIntegrationTest {
private static final int NUM_BROKERS = 1;
+ private static final String APPID = "restore-test";
+
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER =
new EmbeddedKafkaCluster(NUM_BROKERS);
@@ -83,24 +88,24 @@
private static final String INPUT_STREAM_2 = "input-stream-2";
private final int numberOfKeys = 10000;
private KafkaStreams kafkaStreams;
- private String applicationId = "restore-test";
-
@BeforeClass
public static void createTopics() throws InterruptedException {
CLUSTER.createTopic(INPUT_STREAM, 2, 1);
CLUSTER.createTopic(INPUT_STREAM_2, 2, 1);
+ CLUSTER.createTopic(APPID + "-store-changelog", 2, 1);
}
private Properties props(final String applicationId) {
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
applicationId);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
- streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
+
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory(applicationId).getPath());
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.Integer().getClass());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.Integer().getClass());
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
1000);
+ streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
return streamsConfiguration;
}
@@ -112,24 +117,106 @@ public void shutdown() {
}
@Test
- public void shouldRestoreState() throws ExecutionException,
InterruptedException {
+ public void shouldRestoreStateFromSourceTopic() throws Exception {
final AtomicInteger numReceived = new AtomicInteger(0);
final StreamsBuilder builder = new StreamsBuilder();
- createStateForRestoration();
+ final Properties props = props(APPID);
+ props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
+
+ // restoring from 1000 to 4000 (committed), and then process from 4000
to 5000 on each of the two partitions
+ final int offsetLimitDelta = 1000;
+ final int offsetCheckpointed = 1000;
+ createStateForRestoration(INPUT_STREAM);
+ setCommittedOffset(INPUT_STREAM, offsetLimitDelta);
+
+ final StateDirectory stateDirectory = new StateDirectory(new
StreamsConfig(props), new MockTime());
+ new OffsetCheckpoint(new File(stateDirectory.directoryForTask(new
TaskId(0, 0)), ".checkpoint"))
+ .write(Collections.singletonMap(new
TopicPartition(INPUT_STREAM, 0), (long) offsetCheckpointed));
+ new OffsetCheckpoint(new File(stateDirectory.directoryForTask(new
TaskId(0, 1)), ".checkpoint"))
+ .write(Collections.singletonMap(new
TopicPartition(INPUT_STREAM, 1), (long) offsetCheckpointed));
+
+ final CountDownLatch startupLatch = new CountDownLatch(1);
+ final CountDownLatch shutdownLatch = new CountDownLatch(1);
builder.table(INPUT_STREAM, Consumed.with(Serdes.Integer(),
Serdes.Integer()))
.toStream()
.foreach(new ForeachAction<Integer, Integer>() {
@Override
public void apply(final Integer key, final Integer value) {
- numReceived.incrementAndGet();
+ if (numReceived.incrementAndGet() == 2 *
offsetLimitDelta)
+ shutdownLatch.countDown();
}
});
+ kafkaStreams = new KafkaStreams(builder.build(), props);
+ kafkaStreams.setStateListener(new KafkaStreams.StateListener() {
+ @Override
+ public void onChange(final KafkaStreams.State newState, final
KafkaStreams.State oldState) {
+ if (newState == KafkaStreams.State.RUNNING && oldState ==
KafkaStreams.State.REBALANCING) {
+ startupLatch.countDown();
+ }
+ }
+ });
+
+ final AtomicLong restored = new AtomicLong(0);
+ kafkaStreams.setGlobalStateRestoreListener(new StateRestoreListener() {
+ @Override
+ public void onRestoreStart(final TopicPartition topicPartition,
final String storeName, final long startingOffset, final long endingOffset) {
+
+ }
+
+ @Override
+ public void onBatchRestored(final TopicPartition topicPartition,
final String storeName, final long batchEndOffset, final long numRestored) {
+
+ }
+
+ @Override
+ public void onRestoreEnd(final TopicPartition topicPartition,
final String storeName, final long totalRestored) {
+ restored.addAndGet(totalRestored);
+ }
+ });
+ kafkaStreams.start();
+
+ assertTrue(startupLatch.await(30, TimeUnit.SECONDS));
+ assertThat(restored.get(), equalTo((long) numberOfKeys -
offsetLimitDelta * 2 - offsetCheckpointed * 2));
+
+ assertTrue(shutdownLatch.await(30, TimeUnit.SECONDS));
+ assertThat(numReceived.get(), equalTo(offsetLimitDelta * 2));
+ }
+
+ @Test
+ public void shouldRestoreStateFromChangelogTopic() throws Exception {
+ final AtomicInteger numReceived = new AtomicInteger(0);
+ final StreamsBuilder builder = new StreamsBuilder();
+
+ final Properties props = props(APPID);
+
+ // restoring from 1000 to 5000, and then process from 5000 to 10000 on
each of the two partitions
+ final int offsetCheckpointed = 1000;
+ createStateForRestoration(APPID + "-store-changelog");
+ createStateForRestoration(INPUT_STREAM);
+
+ final StateDirectory stateDirectory = new StateDirectory(new
StreamsConfig(props), new MockTime());
+ new OffsetCheckpoint(new File(stateDirectory.directoryForTask(new
TaskId(0, 0)), ".checkpoint"))
+ .write(Collections.singletonMap(new TopicPartition(APPID +
"-store-changelog", 0), (long) offsetCheckpointed));
+ new OffsetCheckpoint(new File(stateDirectory.directoryForTask(new
TaskId(0, 1)), ".checkpoint"))
+ .write(Collections.singletonMap(new TopicPartition(APPID +
"-store-changelog", 1), (long) offsetCheckpointed));
final CountDownLatch startupLatch = new CountDownLatch(1);
- kafkaStreams = new KafkaStreams(builder.build(), props(applicationId));
+ final CountDownLatch shutdownLatch = new CountDownLatch(1);
+
+ builder.table(INPUT_STREAM, Consumed.with(Serdes.Integer(),
Serdes.Integer()), Materialized.as("store"))
+ .toStream()
+ .foreach(new ForeachAction<Integer, Integer>() {
+ @Override
+ public void apply(final Integer key, final Integer value) {
+ if (numReceived.incrementAndGet() == numberOfKeys)
+ shutdownLatch.countDown();
+ }
+ });
+
+ kafkaStreams = new KafkaStreams(builder.build(), props);
kafkaStreams.setStateListener(new KafkaStreams.StateListener() {
@Override
public void onChange(final KafkaStreams.State newState, final
KafkaStreams.State oldState) {
@@ -159,8 +246,10 @@ public void onRestoreEnd(final TopicPartition
topicPartition, final String store
kafkaStreams.start();
assertTrue(startupLatch.await(30, TimeUnit.SECONDS));
- assertThat(restored.get(), equalTo((long) numberOfKeys));
- assertThat(numReceived.get(), equalTo(0));
+ assertThat(restored.get(), equalTo((long) numberOfKeys - 2 *
offsetCheckpointed));
+
+ assertTrue(shutdownLatch.await(30, TimeUnit.SECONDS));
+ assertThat(numReceived.get(), equalTo(numberOfKeys));
}
@@ -178,7 +267,7 @@ public Integer apply(final Integer value1, final Integer
value2) {
}, Materialized.<Integer, Integer, KeyValueStore<Bytes,
byte[]>>as("reduce-store").withLoggingDisabled());
final CountDownLatch startupLatch = new CountDownLatch(1);
- kafkaStreams = new KafkaStreams(builder.build(), props(applicationId));
+ kafkaStreams = new KafkaStreams(builder.build(), props(APPID));
kafkaStreams.setStateListener(new KafkaStreams.StateListener() {
@Override
public void onChange(final KafkaStreams.State newState, final
KafkaStreams.State oldState) {
@@ -228,7 +317,7 @@ public void
shouldProcessDataFromStoresWithLoggingDisabled() throws InterruptedE
final Topology topology = streamsBuilder.build();
- kafkaStreams = new KafkaStreams(topology, props(applicationId +
"-logging-disabled"));
+ kafkaStreams = new KafkaStreams(topology, props(APPID +
"-logging-disabled"));
final CountDownLatch latch = new CountDownLatch(1);
kafkaStreams.setStateListener(new KafkaStreams.StateListener() {
@@ -279,8 +368,7 @@ public void close() {
}
}
- private void createStateForRestoration()
- throws ExecutionException, InterruptedException {
+ private void createStateForRestoration(final String changelogTopic) {
final Properties producerConfig = new Properties();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
@@ -288,30 +376,33 @@ private void createStateForRestoration()
new KafkaProducer<>(producerConfig, new
IntegerSerializer(), new IntegerSerializer())) {
for (int i = 0; i < numberOfKeys; i++) {
- producer.send(new ProducerRecord<>(INPUT_STREAM, i, i));
+ producer.send(new ProducerRecord<>(changelogTopic, i, i));
}
}
+ }
+ private void setCommittedOffset(final String topic, final int limitDelta) {
final Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
- consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, applicationId);
+ consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, APPID);
+ consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, "commit-consumer");
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
IntegerDeserializer.class);
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
IntegerDeserializer.class);
final Consumer consumer = new KafkaConsumer(consumerConfig);
- final List<TopicPartition> partitions = Arrays.asList(new
TopicPartition(INPUT_STREAM, 0),
- new
TopicPartition(INPUT_STREAM, 1));
+ final List<TopicPartition> partitions = Arrays.asList(
+ new TopicPartition(topic, 0),
+ new TopicPartition(topic, 1));
consumer.assign(partitions);
consumer.seekToEnd(partitions);
- final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for (TopicPartition partition : partitions) {
final long position = consumer.position(partition);
- offsets.put(partition, new OffsetAndMetadata(position + 1));
+ consumer.seek(partition, position - limitDelta);
}
- consumer.commitSync(offsets);
+ consumer.commitSync();
consumer.close();
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
index b5e6fcb63b9..5fab6660c4e 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
@@ -55,8 +55,8 @@ public void prepareTopology() throws InterruptedException {
appID = "table-table-join-integration-test";
builder = new StreamsBuilder();
- leftTable = builder.table(INPUT_TOPIC_LEFT);
- rightTable = builder.table(INPUT_TOPIC_RIGHT);
+ leftTable = builder.table(INPUT_TOPIC_LEFT, Materialized.<Long,
String, KeyValueStore<Bytes, byte[]>>as("left").withLoggingDisabled());
+ rightTable = builder.table(INPUT_TOPIC_RIGHT, Materialized.<Long,
String, KeyValueStore<Bytes, byte[]>>as("right").withLoggingDisabled());
}
final private String expectedFinalJoinResult = "D-d";
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
index 63432ffc439..ef3fcd6110f 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
@@ -137,7 +137,7 @@ public void
shouldStillMaterializeSourceKTableIfMaterializedIsntQueryable() {
assertEquals(storeName, topology.stateStores().get(0).name());
assertEquals(1, topology.storeToChangelogTopic().size());
- assertEquals("topic2",
topology.storeToChangelogTopic().get(storeName));
+ assertEquals("app-id-prefix-STATE-STORE-0000000000-changelog",
topology.storeToChangelogTopic().get(storeName));
assertNull(table1.queryableStoreName());
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 936c67b8ef8..97d605265db 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -21,7 +21,6 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.clients.consumer.MockConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.Cluster;
@@ -59,6 +58,7 @@
import org.apache.kafka.streams.processor.ThreadMetadata;
import
org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockStateRestoreListener;
import org.apache.kafka.test.MockTimestampExtractor;
@@ -69,6 +69,8 @@
import org.junit.Before;
import org.junit.Test;
+import java.io.File;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -84,6 +86,7 @@
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static
org.apache.kafka.streams.processor.internals.AbstractStateManager.CHECKPOINT_FILE_NAME;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -821,12 +824,13 @@ public void
shouldReturnStandbyTaskMetadataWhileRunningState() {
@SuppressWarnings("unchecked")
@Test
- public void shouldUpdateStandbyTask() {
+ public void shouldUpdateStandbyTask() throws IOException {
final String storeName1 = "count-one";
final String storeName2 = "table-two";
- final String changelogName = applicationId + "-" + storeName1 +
"-changelog";
- final TopicPartition partition1 = new TopicPartition(changelogName, 1);
- final TopicPartition partition2 = t2p1;
+ final String changelogName1 = applicationId + "-" + storeName1 +
"-changelog";
+ final String changelogName2 = applicationId + "-" + storeName2 +
"-changelog";
+ final TopicPartition partition1 = new TopicPartition(changelogName1,
1);
+ final TopicPartition partition2 = new TopicPartition(changelogName2,
1);
internalStreamsBuilder.stream(Collections.singleton(topic1), consumed)
.groupByKey().count(Materialized.<Object, Long,
KeyValueStore<Bytes, byte[]>>as(storeName1));
final MaterializedInternal materialized = new
MaterializedInternal(Materialized.as(storeName2));
@@ -835,10 +839,10 @@ public void shouldUpdateStandbyTask() {
final StreamThread thread = createStreamThread(clientId, config,
false);
final MockConsumer<byte[], byte[]> restoreConsumer =
clientSupplier.restoreConsumer;
- restoreConsumer.updatePartitions(changelogName,
+ restoreConsumer.updatePartitions(changelogName1,
singletonList(
new PartitionInfo(
- changelogName,
+ changelogName1,
1,
null,
new Node[0],
@@ -852,13 +856,13 @@ public void shouldUpdateStandbyTask() {
restoreConsumer.updateBeginningOffsets(Collections.singletonMap(partition1,
0L));
restoreConsumer.updateEndOffsets(Collections.singletonMap(partition2,
10L));
restoreConsumer.updateBeginningOffsets(Collections.singletonMap(partition2,
0L));
- // let the store1 be restored from 0 to 10; store2 be restored from 0
to (committed offset) 5
- clientSupplier.consumer.assign(Utils.mkSet(partition2));
-
clientSupplier.consumer.commitSync(Collections.singletonMap(partition2, new
OffsetAndMetadata(5L, "")));
+ // let the store1 be restored from 0 to 10; store2 be restored from 5
(checkpointed) to 10
+ OffsetCheckpoint checkpoint = new OffsetCheckpoint(new
File(stateDirectory.directoryForTask(task3), CHECKPOINT_FILE_NAME));
+ checkpoint.write(Collections.singletonMap(partition2, 5L));
for (long i = 0L; i < 10L; i++) {
- restoreConsumer.addRecord(new ConsumerRecord<>(changelogName, 1,
i, ("K" + i).getBytes(), ("V" + i).getBytes()));
- restoreConsumer.addRecord(new ConsumerRecord<>(topic2, 1, i, ("K"
+ i).getBytes(), ("V" + i).getBytes()));
+ restoreConsumer.addRecord(new ConsumerRecord<>(changelogName1, 1,
i, ("K" + i).getBytes(), ("V" + i).getBytes()));
+ restoreConsumer.addRecord(new ConsumerRecord<>(changelogName2, 1,
i, ("K" + i).getBytes(), ("V" + i).getBytes()));
}
thread.setState(StreamThread.State.RUNNING);
@@ -884,9 +888,7 @@ public void shouldUpdateStandbyTask() {
assertEquals(10L, store1.approximateNumEntries());
assertEquals(5L, store2.approximateNumEntries());
- assertEquals(Collections.singleton(partition2),
restoreConsumer.paused());
- assertEquals(1, thread.standbyRecords().size());
- assertEquals(5, thread.standbyRecords().get(partition2).size());
+ assertEquals(0, thread.standbyRecords().size());
}
@Test
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index a32d193a171..4327e8f1ee4 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -799,6 +799,7 @@ public Object apply(final Object value1, final Object
value2) {
final Map<String, Integer> expectedCreatedInternalTopics = new
HashMap<>();
expectedCreatedInternalTopics.put(applicationId +
"-KTABLE-AGGREGATE-STATE-STORE-0000000006-repartition", 4);
expectedCreatedInternalTopics.put(applicationId +
"-KTABLE-AGGREGATE-STATE-STORE-0000000006-changelog", 4);
+ expectedCreatedInternalTopics.put(applicationId +
"-topic3-STATE-STORE-0000000002-changelog", 4);
expectedCreatedInternalTopics.put(applicationId +
"-KSTREAM-MAP-0000000001-repartition", 4);
// check if all internal topics were created as expected
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Source KTable checkpoint is not correct
> ---------------------------------------
>
> Key: KAFKA-7021
> URL: https://issues.apache.org/jira/browse/KAFKA-7021
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 0.10.0.0
> Reporter: Matthias J. Sax
> Assignee: Guozhang Wang
> Priority: Major
> Fix For: 0.10.2.2, 2.0.0, 0.11.0.3, 1.0.2, 1.1.1
>
>
> Kafka Streams treats source KTables,ie, table created via `builder.table()`,
> differently. Instead of creating a changelog topic, the original source topic
> is use to avoid unnecessary data redundancy.
> However, Kafka Streams does not write a correct local state checkpoint file.
> This results in unnecessary state restore after a rebalance. Instead of the
> latest committed offset, the latest restored offset is written into the
> checkpoint file in `ProcessorStateManager#close()`
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)