This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 2.6 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.6 by this push: new 516ab2d KAFKA-10005: Decouple RestoreListener from RestoreCallback (#8676) 516ab2d is described below commit 516ab2d938b1f59d5b7a2e799ae7c7d553d8e446 Author: Guozhang Wang <wangg...@gmail.com> AuthorDate: Mon Jun 8 14:17:45 2020 -0700 KAFKA-10005: Decouple RestoreListener from RestoreCallback (#8676) And remove bulk loading mechanism inside RocksDB. Reviewers: John Roesler <vvcep...@apache.org>, A. Sophie Blee-Goldman <sop...@confluent.io> --- .../AbstractNotifyingBatchingRestoreCallback.java | 83 -------- .../AbstractNotifyingRestoreCallback.java | 72 ------- .../processor/BatchingStateRestoreCallback.java | 4 + .../streams/processor/StateRestoreListener.java | 9 +- .../processor/internals/ChangelogRegister.java | 3 +- .../internals/CompositeRestoreListener.java | 116 ----------- .../processor/internals/ProcessorStateManager.java | 18 +- .../RecordBatchingStateRestoreCallback.java | 5 - .../processor/internals/StoreChangelogReader.java | 48 +---- .../AbstractRocksDBSegmentedBytesStore.java | 43 +--- ...ulkLoadingStore.java => BatchWritingStore.java} | 3 +- .../streams/state/internals/RocksDBStore.java | 60 +----- .../state/internals/RocksDBTimestampedStore.java | 15 -- .../kafka/streams/state/internals/Segment.java | 2 +- .../internals/CompositeRestoreListenerTest.java | 222 --------------------- .../processor/internals/MockChangelogReader.java | 2 +- .../internals/ProcessorStateManagerTest.java | 18 +- .../internals/StoreChangelogReaderTest.java | 85 +------- .../AbstractRocksDBSegmentedBytesStoreTest.java | 31 --- .../internals/RocksDBSegmentedBytesStoreTest.java | 7 - .../streams/state/internals/RocksDBStoreTest.java | 50 ----- .../RocksDBTimestampedSegmentedBytesStoreTest.java | 7 - .../kafka/test/InternalMockProcessorContext.java | 20 -- .../test/MockBatchingStateRestoreListener.java | 44 ---- .../org/apache/kafka/test/MockRestoreCallback.java | 1 - .../kafka/test/MockStateRestoreListener.java | 14 +- 26 files changed, 39 insertions(+), 943 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/AbstractNotifyingBatchingRestoreCallback.java b/streams/src/main/java/org/apache/kafka/streams/processor/AbstractNotifyingBatchingRestoreCallback.java deleted file mode 100644 index 7b5b5d0..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/processor/AbstractNotifyingBatchingRestoreCallback.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.processor; - - -import org.apache.kafka.common.TopicPartition; - -/** - * Abstract implementation of the {@link BatchingStateRestoreCallback} used for batch restoration operations. - * - * Includes default no-op methods of the {@link StateRestoreListener} {@link StateRestoreListener#onRestoreStart(TopicPartition, String, long, long)}, - * {@link StateRestoreListener#onBatchRestored(TopicPartition, String, long, long)}, and {@link StateRestoreListener#onRestoreEnd(TopicPartition, String, long)}. - */ -public abstract class AbstractNotifyingBatchingRestoreCallback implements BatchingStateRestoreCallback, StateRestoreListener { - - /** - * Single put restore operations not supported, please use {@link AbstractNotifyingRestoreCallback} - * or {@link StateRestoreCallback} instead for single action restores. - */ - @Override - public void restore(final byte[] key, - final byte[] value) { - throw new UnsupportedOperationException("Single restore not supported"); - } - - - /** - * @see StateRestoreListener#onRestoreStart(TopicPartition, String, long, long) - * - * This method does nothing by default; if desired, subclasses should override it with custom functionality. - * - */ - @Override - public void onRestoreStart(final TopicPartition topicPartition, - final String storeName, - final long startingOffset, - final long endingOffset) { - - } - - - /** - * @see StateRestoreListener#onBatchRestored(TopicPartition, String, long, long) - * - * This method does nothing by default; if desired, subclasses should override it with custom functionality. - * - */ - @Override - public void onBatchRestored(final TopicPartition topicPartition, - final String storeName, - final long batchEndOffset, - final long numRestored) { - - } - - /** - * @see StateRestoreListener#onRestoreEnd(TopicPartition, String, long) - * - * This method does nothing by default; if desired, subclasses should override it with custom functionality. - * - */ - @Override - public void onRestoreEnd(final TopicPartition topicPartition, - final String storeName, - final long totalRestored) { - - } -} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/AbstractNotifyingRestoreCallback.java b/streams/src/main/java/org/apache/kafka/streams/processor/AbstractNotifyingRestoreCallback.java deleted file mode 100644 index 2eb3f66..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/processor/AbstractNotifyingRestoreCallback.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.processor; - -import org.apache.kafka.common.TopicPartition; - -/** - * Abstract implementation of the {@link StateRestoreCallback} used for batch restoration operations. - * - * Includes default no-op methods of the {@link StateRestoreListener} {@link StateRestoreListener#onRestoreStart(TopicPartition, String, long, long)}, - * {@link StateRestoreListener#onBatchRestored(TopicPartition, String, long, long)}, and {@link StateRestoreListener#onRestoreEnd(TopicPartition, String, long)}. - */ -public abstract class AbstractNotifyingRestoreCallback implements StateRestoreCallback, StateRestoreListener { - - - /** - * @see StateRestoreListener#onRestoreStart(TopicPartition, String, long, long) - * - * This method does nothing by default; if desired, subclasses should override it with custom functionality. - * - */ - @Override - public void onRestoreStart(final TopicPartition topicPartition, - final String storeName, - final long startingOffset, - final long endingOffset) { - - } - - - /** - * @see StateRestoreListener#onBatchRestored(TopicPartition, String, long, long) - * - * This method does nothing by default; if desired, subclasses should override it with custom functionality. - * - */ - @Override - public void onBatchRestored(final TopicPartition topicPartition, - final String storeName, - final long batchEndOffset, - final long numRestored) { - - } - - /** - * @see StateRestoreListener#onRestoreEnd(TopicPartition, String, long) - * - * This method does nothing by default; if desired, subclasses should override it with custom functionality. - * - */ - @Override - public void onRestoreEnd(final TopicPartition topicPartition, - final String storeName, - final long totalRestored) { - - } -} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/BatchingStateRestoreCallback.java b/streams/src/main/java/org/apache/kafka/streams/processor/BatchingStateRestoreCallback.java index 3447a57..d29c7ba 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/BatchingStateRestoreCallback.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/BatchingStateRestoreCallback.java @@ -38,4 +38,8 @@ public interface BatchingStateRestoreCallback extends StateRestoreCallback { */ void restoreAll(Collection<KeyValue<byte[], byte[]>> records); + @Override + default void restore(byte[] key, byte[] value) { + throw new UnsupportedOperationException(); + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java index ea1c288..210a5de 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java @@ -29,9 +29,12 @@ import org.apache.kafka.common.TopicPartition; * Users desiring stateful operations will need to provide synchronization internally in * the {@code StateRestorerListener} implementation. * - * When used for monitoring a single {@link StateStore} using either {@link AbstractNotifyingRestoreCallback} or - * {@link AbstractNotifyingBatchingRestoreCallback} no synchronization is necessary - * as each StreamThread has its own StateStore instance. + * Note that this listener is only registered at the per-client level and users can base on the {@code storeName} + * parameter to define specific monitoring for different {@link StateStore}s. There is another + * {@link StateRestoreCallback} interface which is registered via the {@link ProcessorContext#register(StateStore, StateRestoreCallback)} + * function per-store, and it is used to apply the fetched changelog records into the local state store during restoration. + * These two interfaces serve different restoration purposes and users should not try to implement both of them in a single + * class during state store registration. * * Incremental updates are exposed so users can estimate how much progress has been made. */ diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogRegister.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogRegister.java index cdddd20..7403aad 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogRegister.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogRegister.java @@ -34,7 +34,6 @@ interface ChangelogRegister { /** * Unregisters and removes the passed in partitions from the set of changelogs * @param removedPartitions the set of partitions to remove - * @param triggerOnRestoreEnd whether to trigger the onRestoreEnd callback */ - void unregister(final Collection<TopicPartition> removedPartitions, final boolean triggerOnRestoreEnd); + void unregister(final Collection<TopicPartition> removedPartitions); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListener.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListener.java deleted file mode 100644 index 7cccad6..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListener.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.processor.internals; - - -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.errors.StreamsException; -import org.apache.kafka.streams.processor.AbstractNotifyingBatchingRestoreCallback; -import org.apache.kafka.streams.processor.StateRestoreCallback; -import org.apache.kafka.streams.processor.StateRestoreListener; - -import java.util.Collection; - -public class CompositeRestoreListener implements RecordBatchingStateRestoreCallback, StateRestoreListener { - - public static final NoOpStateRestoreListener NO_OP_STATE_RESTORE_LISTENER = new NoOpStateRestoreListener(); - private final RecordBatchingStateRestoreCallback internalBatchingRestoreCallback; - private final StateRestoreListener storeRestoreListener; - private StateRestoreListener userRestoreListener = NO_OP_STATE_RESTORE_LISTENER; - - CompositeRestoreListener(final StateRestoreCallback stateRestoreCallback) { - - if (stateRestoreCallback instanceof StateRestoreListener) { - storeRestoreListener = (StateRestoreListener) stateRestoreCallback; - } else { - storeRestoreListener = NO_OP_STATE_RESTORE_LISTENER; - } - - internalBatchingRestoreCallback = StateRestoreCallbackAdapter.adapt(stateRestoreCallback); - } - - /** - * @throws StreamsException if user provided {@link StateRestoreListener} raises an exception in - * {@link StateRestoreListener#onRestoreStart(TopicPartition, String, long, long)} - */ - @Override - public void onRestoreStart(final TopicPartition topicPartition, - final String storeName, - final long startingOffset, - final long endingOffset) { - userRestoreListener.onRestoreStart(topicPartition, storeName, startingOffset, endingOffset); - storeRestoreListener.onRestoreStart(topicPartition, storeName, startingOffset, endingOffset); - } - - /** - * @throws StreamsException if user provided {@link StateRestoreListener} raises an exception in - * {@link StateRestoreListener#onBatchRestored(TopicPartition, String, long, long)} - */ - @Override - public void onBatchRestored(final TopicPartition topicPartition, - final String storeName, - final long batchEndOffset, - final long numRestored) { - userRestoreListener.onBatchRestored(topicPartition, storeName, batchEndOffset, numRestored); - storeRestoreListener.onBatchRestored(topicPartition, storeName, batchEndOffset, numRestored); - } - - /** - * @throws StreamsException if user provided {@link StateRestoreListener} raises an exception in - * {@link StateRestoreListener#onRestoreEnd(TopicPartition, String, long)} - */ - @Override - public void onRestoreEnd(final TopicPartition topicPartition, - final String storeName, - final long totalRestored) { - userRestoreListener.onRestoreEnd(topicPartition, storeName, totalRestored); - storeRestoreListener.onRestoreEnd(topicPartition, storeName, totalRestored); - } - - @Override - public void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> records) { - internalBatchingRestoreCallback.restoreBatch(records); - } - - void setUserRestoreListener(final StateRestoreListener userRestoreListener) { - if (userRestoreListener != null) { - this.userRestoreListener = userRestoreListener; - } - } - - @Override - public void restoreAll(final Collection<KeyValue<byte[], byte[]>> records) { - throw new UnsupportedOperationException(); - } - - @Override - public void restore(final byte[] key, - final byte[] value) { - throw new UnsupportedOperationException("Single restore functionality shouldn't be called directly but " - + "through the delegated StateRestoreCallback instance"); - } - - private static final class NoOpStateRestoreListener extends AbstractNotifyingBatchingRestoreCallback implements RecordBatchingStateRestoreCallback { - @Override - public void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> records) { - - } - } -} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index f00284f..d78c9a3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -25,6 +25,7 @@ import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TaskCorruptedException; import org.apache.kafka.streams.errors.TaskMigratedException; +import org.apache.kafka.streams.processor.StateRestoreListener; import org.apache.kafka.streams.processor.internals.Task.TaskType; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; @@ -131,10 +132,6 @@ public class ProcessorStateManager implements StateManager { return this.stateStore; } - StateRestoreCallback restoreCallback() { - return this.restoreCallback; - } - @Override public String toString() { return "StateStoreMetadata (" + stateStore.name() + " : " + changelogPartition + " @ " + offset; @@ -303,6 +300,11 @@ public class ProcessorStateManager implements StateManager { throw new IllegalArgumentException(format("%sStore %s has already been registered.", logPrefix, storeName)); } + if (stateRestoreCallback instanceof StateRestoreListener) { + log.warn("The registered state restore callback is also implementing the state restore listener interface, " + + "which is not expected and would be ignored"); + } + final StateStoreMetadata storeMetadata = isLoggingEnabled(storeName) ? new StateStoreMetadata( store, @@ -459,7 +461,7 @@ public class ProcessorStateManager implements StateManager { public void close() throws ProcessorStateException { log.debug("Closing its state manager and all the registered state stores: {}", stores); - changelogReader.unregister(getAllChangelogTopicPartitions(), false); + changelogReader.unregister(getAllChangelogTopicPartitions()); RuntimeException firstException = null; // attempting to close the stores, just in case they @@ -499,11 +501,7 @@ public class ProcessorStateManager implements StateManager { log.debug("Recycling state for {} task {}.", taskType, taskId); final List<TopicPartition> allChangelogs = getAllChangelogTopicPartitions(); - if (taskType.equals(TaskType.ACTIVE)) { - changelogReader.unregister(allChangelogs, true); - } else { - changelogReader.unregister(allChangelogs, false); - } + changelogReader.unregister(allChangelogs); } void transitionTaskType(final TaskType newType, final LogContext logContext) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordBatchingStateRestoreCallback.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordBatchingStateRestoreCallback.java index 78a885d..300b60d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordBatchingStateRestoreCallback.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordBatchingStateRestoreCallback.java @@ -29,9 +29,4 @@ public interface RecordBatchingStateRestoreCallback extends BatchingStateRestore default void restoreAll(final Collection<KeyValue<byte[], byte[]>> records) { throw new UnsupportedOperationException(); } - - @Override - default void restore(final byte[] key, final byte[] value) { - throw new UnsupportedOperationException(); - } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java index 6c6ff39..cecbbd3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.processor.internals; -import java.util.concurrent.atomic.AtomicReference; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -29,7 +28,6 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TaskCorruptedException; -import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateRestoreListener; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.ProcessorStateManager.StateStoreMetadata; @@ -524,13 +522,6 @@ public class StoreChangelogReader implements ChangelogReader { // do not trigger restore listener if we are processing standby tasks if (changelogMetadata.stateManager.taskType() == Task.TaskType.ACTIVE) { try { - // first trigger the store's specific listener if its registered callback is also an lister, - // then trigger the user registered global listener - final StateRestoreCallback restoreCallback = storeMetadata.restoreCallback(); - if (restoreCallback instanceof StateRestoreListener) { - ((StateRestoreListener) restoreCallback).onBatchRestored(partition, storeName, currentOffset, numRecords); - } - stateRestoreListener.onBatchRestored(partition, storeName, currentOffset, numRecords); } catch (final Exception e) { throw new StreamsException("State restore listener failed on batch restored", e); @@ -547,13 +538,6 @@ public class StoreChangelogReader implements ChangelogReader { pauseChangelogsFromRestoreConsumer(Collections.singleton(partition)); try { - // first trigger the store's specific listener if its registered callback is also an listener, - // then trigger the user registered global listener - final StateRestoreCallback restoreCallback = storeMetadata.restoreCallback(); - if (restoreCallback instanceof StateRestoreListener) { - ((StateRestoreListener) restoreCallback).onRestoreEnd(partition, storeName, changelogMetadata.totalRestored); - } - stateRestoreListener.onRestoreEnd(partition, storeName, changelogMetadata.totalRestored); } catch (final Exception e) { throw new StreamsException("State restore listener failed on restore completed", e); @@ -797,13 +781,6 @@ public class StoreChangelogReader implements ChangelogReader { } try { - // first trigger the store's specific listener if its registered callback is also an lister, - // then trigger the user registered global listener - final StateRestoreCallback restoreCallback = storeMetadata.restoreCallback(); - if (restoreCallback instanceof StateRestoreListener) { - ((StateRestoreListener) restoreCallback).onRestoreStart(partition, storeName, startOffset, changelogMetadata.restoreEndOffset); - } - stateRestoreListener.onRestoreStart(partition, storeName, startOffset, changelogMetadata.restoreEndOffset); } catch (final Exception e) { throw new StreamsException("State restore listener failed on batch restored", e); @@ -812,37 +789,14 @@ public class StoreChangelogReader implements ChangelogReader { } } - private RuntimeException invokeOnRestoreEnd(final TopicPartition partition, - final ChangelogMetadata changelogMetadata) { - // only trigger the store's specific listener to make sure we disable bulk loading before transition to standby - final StateStoreMetadata storeMetadata = changelogMetadata.storeMetadata; - final StateRestoreCallback restoreCallback = storeMetadata.restoreCallback(); - final String storeName = storeMetadata.store().name(); - if (restoreCallback instanceof StateRestoreListener) { - try { - ((StateRestoreListener) restoreCallback).onRestoreEnd(partition, storeName, changelogMetadata.totalRestored); - } catch (final RuntimeException e) { - return e; - } - } - return null; - } - @Override - public void unregister(final Collection<TopicPartition> revokedChangelogs, - final boolean triggerOnRestoreEnd) { - final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null); - + public void unregister(final Collection<TopicPartition> revokedChangelogs) { // Only changelogs that are initialized have been added to the restore consumer's assignment final List<TopicPartition> revokedInitializedChangelogs = new ArrayList<>(); for (final TopicPartition partition : revokedChangelogs) { final ChangelogMetadata changelogMetadata = changelogs.remove(partition); if (changelogMetadata != null) { - if (triggerOnRestoreEnd && changelogMetadata.state().equals(ChangelogState.RESTORING)) { - firstException.compareAndSet(null, invokeOnRestoreEnd(partition, changelogMetadata)); - } - if (!changelogMetadata.state().equals(ChangelogState.REGISTERED)) { revokedInitializedChangelogs.add(partition); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java index ce64bf2..f0979f6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java @@ -17,16 +17,14 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.ProcessorStateException; -import org.apache.kafka.streams.processor.AbstractNotifyingBatchingRestoreCallback; +import org.apache.kafka.streams.processor.BatchingStateRestoreCallback; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; -import org.apache.kafka.streams.processor.internals.Task.TaskType; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics; import org.apache.kafka.streams.state.KeyValueIterator; @@ -37,10 +35,8 @@ import org.slf4j.LoggerFactory; import java.util.Collection; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements SegmentedBytesStore { private static final Logger LOG = LoggerFactory.getLogger(AbstractRocksDBSegmentedBytesStore.class); @@ -50,7 +46,6 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se private final KeySchema keySchema; private InternalProcessorContext context; private volatile boolean open; - private Set<S> bulkLoadSegments; private Sensor expiredRecordSensor; private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; @@ -186,8 +181,6 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se segments.openExisting(this.context, observedStreamTime); - bulkLoadSegments = new HashSet<>(segments.allSegments()); - // register and possibly restore the state from the logs context.register(root, new RocksDBSegmentsBatchingRestoreCallback()); @@ -249,17 +242,6 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se final long segmentId = segments.segmentId(timestamp); final S segment = segments.getOrCreateSegmentIfLive(segmentId, context, observedStreamTime); if (segment != null) { - // This handles the case that state store is moved to a new client and does not - // have the local RocksDB instance for the segment. In this case, toggleDBForBulkLoading - // will only close the database and open it again with bulk loading enabled. - if (!bulkLoadSegments.contains(segment) && context.taskType() == TaskType.ACTIVE) { - segment.toggleDbForBulkLoading(true); - // If the store does not exist yet, the getOrCreateSegmentIfLive will call openDB that - // makes the open flag for the newly created store. - // if the store does exist already, then toggleDbForBulkLoading will make sure that - // the store is already open here. - bulkLoadSegments = new HashSet<>(segments.allSegments()); - } try { final WriteBatch batch = writeBatchMap.computeIfAbsent(segment, s -> new WriteBatch()); segment.addToBatch(record, batch); @@ -271,32 +253,11 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se return writeBatchMap; } - private void toggleForBulkLoading(final boolean prepareForBulkload) { - for (final S segment : segments.allSegments()) { - segment.toggleDbForBulkLoading(prepareForBulkload); - } - } - - private class RocksDBSegmentsBatchingRestoreCallback extends AbstractNotifyingBatchingRestoreCallback { + private class RocksDBSegmentsBatchingRestoreCallback implements BatchingStateRestoreCallback { @Override public void restoreAll(final Collection<KeyValue<byte[], byte[]>> records) { restoreAllInternal(records); } - - @Override - public void onRestoreStart(final TopicPartition topicPartition, - final String storeName, - final long startingOffset, - final long endingOffset) { - toggleForBulkLoading(true); - } - - @Override - public void onRestoreEnd(final TopicPartition topicPartition, - final String storeName, - final long totalRestored) { - toggleForBulkLoading(false); - } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/BulkLoadingStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/BatchWritingStore.java similarity index 91% rename from streams/src/main/java/org/apache/kafka/streams/state/internals/BulkLoadingStore.java rename to streams/src/main/java/org/apache/kafka/streams/state/internals/BatchWritingStore.java index 1e27cc2..2ac1e3b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/BulkLoadingStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/BatchWritingStore.java @@ -20,8 +20,7 @@ import org.apache.kafka.streams.KeyValue; import org.rocksdb.RocksDBException; import org.rocksdb.WriteBatch; -public interface BulkLoadingStore { - void toggleDbForBulkLoading(final boolean prepareForBulkload); +public interface BatchWritingStore { void addToBatch(final KeyValue<byte[], byte[]> record, final WriteBatch batch) throws RocksDBException; void write(final WriteBatch batch) throws RocksDBException; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index a57f31e..fc2bb24 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.state.internals; -import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Sensor.RecordingLevel; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Utils; @@ -24,7 +23,6 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.errors.ProcessorStateException; -import org.apache.kafka.streams.processor.AbstractNotifyingBatchingRestoreCallback; import org.apache.kafka.streams.processor.BatchingStateRestoreCallback; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; @@ -73,7 +71,7 @@ import static org.apache.kafka.streams.StreamsConfig.METRICS_RECORDING_LEVEL_CON /** * A persistent key-value store based on RocksDB. */ -public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingStore { +public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingStore { private static final Logger log = LoggerFactory.getLogger(RocksDBStore.class); private static final Pattern SST_FILE_EXTENSION = Pattern.compile(".*\\.sst"); @@ -106,7 +104,6 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingSt private final RocksDBMetricsRecorder metricsRecorder; private boolean isStatisticsRegistered = false; - private volatile boolean prepareForBulkload = false; ProcessorContext internalProcessorContext; // visible for testing volatile BatchingStateRestoreCallback batchingStateRestoreCallback = null; @@ -175,10 +172,6 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingSt configSetter.setConfig(name, userSpecifiedOptions, configs); } - if (prepareForBulkload) { - userSpecifiedOptions.prepareForBulkLoad(); - } - dbDir = new File(new File(context.stateDir(), parentDir), name); try { @@ -236,11 +229,6 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingSt context.register(root, batchingStateRestoreCallback); } - // visible for testing - boolean isPrepareForBulkload() { - return prepareForBulkload; - } - @Override public String name() { return name; @@ -262,7 +250,6 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingSt } } - @SuppressWarnings("unchecked") @Override public synchronized void put(final Bytes key, final byte[] value) { @@ -391,22 +378,6 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingSt } @Override - public void toggleDbForBulkLoading(final boolean prepareForBulkload) { - if (prepareForBulkload) { - // if the store is not empty, we need to compact to get around the num.levels check for bulk loading - final String[] sstFileNames = dbDir.list((dir, name) -> SST_FILE_EXTENSION.matcher(name).matches()); - - if (sstFileNames != null && sstFileNames.length > 0) { - dbAccessor.toggleDbForBulkLoading(); - } - } - - close(); - this.prepareForBulkload = prepareForBulkload; - openDB(internalProcessorContext); - } - - @Override public void addToBatch(final KeyValue<byte[], byte[]> record, final WriteBatch batch) throws RocksDBException { dbAccessor.addToBatch(record.key, record.value, batch); @@ -506,8 +477,6 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingSt final WriteBatch batch) throws RocksDBException; void close(); - - void toggleDbForBulkLoading(); } class SingleColumnFamilyAccessor implements RocksDBAccessor { @@ -607,20 +576,10 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingSt public void close() { columnFamily.close(); } - - @Override - @SuppressWarnings("deprecation") - public void toggleDbForBulkLoading() { - try { - db.compactRange(columnFamily, true, 1, 0); - } catch (final RocksDBException e) { - throw new ProcessorStateException("Error while range compacting during restoring store " + name, e); - } - } } // not private for testing - static class RocksDBBatchingRestoreCallback extends AbstractNotifyingBatchingRestoreCallback { + static class RocksDBBatchingRestoreCallback implements BatchingStateRestoreCallback { private final RocksDBStore rocksDBStore; @@ -637,21 +596,6 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingSt throw new ProcessorStateException("Error restoring batch to store " + rocksDBStore.name, e); } } - - @Override - public void onRestoreStart(final TopicPartition topicPartition, - final String storeName, - final long startingOffset, - final long endingOffset) { - rocksDBStore.toggleDbForBulkLoading(true); - } - - @Override - public void onRestoreEnd(final TopicPartition topicPartition, - final String storeName, - final long totalRestored) { - rocksDBStore.toggleDbForBulkLoading(false); - } } // for testing diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java index bc6c17f..6c31e9b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java @@ -249,21 +249,6 @@ public class RocksDBTimestampedStore extends RocksDBStore implements Timestamped oldColumnFamily.close(); newColumnFamily.close(); } - - @Override - @SuppressWarnings("deprecation") - public void toggleDbForBulkLoading() { - try { - db.compactRange(oldColumnFamily, true, 1, 0); - } catch (final RocksDBException e) { - throw new ProcessorStateException("Error while range compacting during restoring store " + name, e); - } - try { - db.compactRange(newColumnFamily, true, 1, 0); - } catch (final RocksDBException e) { - throw new ProcessorStateException("Error while range compacting during restoring store " + name, e); - } - } } private class RocksDBDualCFIterator extends AbstractIterator<KeyValue<Bytes, byte[]>> diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java index fe1fc33..c86ee96 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java @@ -22,7 +22,7 @@ import org.apache.kafka.streams.state.KeyValueStore; import java.io.IOException; -public interface Segment extends KeyValueStore<Bytes, byte[]>, BulkLoadingStore { +public interface Segment extends KeyValueStore<Bytes, byte[]>, BatchWritingStore { void destroy() throws IOException; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListenerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListenerTest.java deleted file mode 100644 index c981107..0000000 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListenerTest.java +++ /dev/null @@ -1,222 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.processor.internals; - -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.processor.BatchingStateRestoreCallback; -import org.apache.kafka.streams.processor.StateRestoreCallback; -import org.apache.kafka.test.MockBatchingStateRestoreListener; -import org.apache.kafka.test.MockStateRestoreListener; -import org.junit.Test; - -import java.nio.charset.StandardCharsets; -import java.util.Collection; -import java.util.Collections; - -import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_BATCH; -import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_END; -import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_START; -import static org.hamcrest.core.Is.is; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.Assert.assertTrue; - - -public class CompositeRestoreListenerTest { - - private final MockStateRestoreCallback stateRestoreCallback = new MockStateRestoreCallback(); - private final MockBatchingStateRestoreListener batchingStateRestoreCallback = new MockBatchingStateRestoreListener(); - private final MockNoListenBatchingStateRestoreCallback - noListenBatchingStateRestoreCallback = - new MockNoListenBatchingStateRestoreCallback(); - private final MockStateRestoreListener reportingStoreListener = new MockStateRestoreListener(); - private final byte[] key = "key".getBytes(StandardCharsets.UTF_8); - private final byte[] value = "value".getBytes(StandardCharsets.UTF_8); - private final Collection<KeyValue<byte[], byte[]>> records = Collections.singletonList(KeyValue.pair(key, value)); - private final Collection<ConsumerRecord<byte[], byte[]>> consumerRecords = Collections.singletonList( - new ConsumerRecord<>("", 0, 0L, key, value) - ); - private final String storeName = "test_store"; - private final long startOffset = 0L; - private final long endOffset = 1L; - private final long batchOffset = 1L; - private final long numberRestored = 1L; - private final TopicPartition topicPartition = new TopicPartition("testTopic", 1); - - private CompositeRestoreListener compositeRestoreListener; - - - @Test - public void shouldRestoreInNonBatchMode() { - setUpCompositeRestoreListener(stateRestoreCallback); - compositeRestoreListener.restoreBatch(consumerRecords); - assertThat(stateRestoreCallback.restoredKey, is(key)); - assertThat(stateRestoreCallback.restoredValue, is(value)); - } - - @Test - public void shouldRestoreInBatchMode() { - setUpCompositeRestoreListener(batchingStateRestoreCallback); - compositeRestoreListener.restoreBatch(consumerRecords); - assertThat(batchingStateRestoreCallback.getRestoredRecords(), is(records)); - } - - @Test - public void shouldNotifyRestoreStartNonBatchMode() { - setUpCompositeRestoreListener(stateRestoreCallback); - compositeRestoreListener.onRestoreStart(topicPartition, storeName, startOffset, endOffset); - assertStateRestoreListenerOnStartNotification(stateRestoreCallback); - assertStateRestoreListenerOnStartNotification(reportingStoreListener); - } - - @Test - public void shouldNotifyRestoreStartBatchMode() { - setUpCompositeRestoreListener(batchingStateRestoreCallback); - compositeRestoreListener.onRestoreStart(topicPartition, storeName, startOffset, endOffset); - assertStateRestoreListenerOnStartNotification(batchingStateRestoreCallback); - assertStateRestoreListenerOnStartNotification(reportingStoreListener); - } - - @Test - public void shouldNotifyRestoreProgressNonBatchMode() { - setUpCompositeRestoreListener(stateRestoreCallback); - compositeRestoreListener.onBatchRestored(topicPartition, storeName, endOffset, numberRestored); - assertStateRestoreListenerOnBatchCompleteNotification(stateRestoreCallback); - assertStateRestoreListenerOnBatchCompleteNotification(reportingStoreListener); - } - - @Test - public void shouldNotifyRestoreProgressBatchMode() { - setUpCompositeRestoreListener(batchingStateRestoreCallback); - compositeRestoreListener.onBatchRestored(topicPartition, storeName, endOffset, numberRestored); - assertStateRestoreListenerOnBatchCompleteNotification(batchingStateRestoreCallback); - assertStateRestoreListenerOnBatchCompleteNotification(reportingStoreListener); - } - - @Test - public void shouldNotifyRestoreEndInNonBatchMode() { - setUpCompositeRestoreListener(stateRestoreCallback); - compositeRestoreListener.onRestoreEnd(topicPartition, storeName, numberRestored); - assertStateRestoreOnEndNotification(stateRestoreCallback); - assertStateRestoreOnEndNotification(reportingStoreListener); - } - - @Test - public void shouldNotifyRestoreEndInBatchMode() { - setUpCompositeRestoreListener(batchingStateRestoreCallback); - compositeRestoreListener.onRestoreEnd(topicPartition, storeName, numberRestored); - assertStateRestoreOnEndNotification(batchingStateRestoreCallback); - assertStateRestoreOnEndNotification(reportingStoreListener); - } - - @Test - public void shouldHandleNullReportStoreListener() { - compositeRestoreListener = new CompositeRestoreListener(batchingStateRestoreCallback); - compositeRestoreListener.setUserRestoreListener(null); - - compositeRestoreListener.restoreBatch(consumerRecords); - compositeRestoreListener.onRestoreStart(topicPartition, storeName, startOffset, endOffset); - compositeRestoreListener.onBatchRestored(topicPartition, storeName, batchOffset, numberRestored); - compositeRestoreListener.onRestoreEnd(topicPartition, storeName, numberRestored); - - assertThat(batchingStateRestoreCallback.getRestoredRecords(), is(records)); - assertStateRestoreOnEndNotification(batchingStateRestoreCallback); - } - - @Test - public void shouldHandleNoRestoreListener() { - compositeRestoreListener = new CompositeRestoreListener(noListenBatchingStateRestoreCallback); - compositeRestoreListener.setUserRestoreListener(null); - - compositeRestoreListener.restoreBatch(consumerRecords); - compositeRestoreListener.onRestoreStart(topicPartition, storeName, startOffset, endOffset); - compositeRestoreListener.onBatchRestored(topicPartition, storeName, batchOffset, numberRestored); - compositeRestoreListener.onRestoreEnd(topicPartition, storeName, numberRestored); - - assertThat(noListenBatchingStateRestoreCallback.restoredRecords, is(records)); - } - - @Test(expected = UnsupportedOperationException.class) - public void shouldThrowExceptionWhenSinglePutDirectlyCalled() { - compositeRestoreListener = new CompositeRestoreListener(noListenBatchingStateRestoreCallback); - compositeRestoreListener.restore(key, value); - } - - @Test(expected = UnsupportedOperationException.class) - public void shouldThrowExceptionWhenRestoreAllDirectlyCalled() { - compositeRestoreListener = new CompositeRestoreListener(noListenBatchingStateRestoreCallback); - compositeRestoreListener.restoreAll(Collections.emptyList()); - } - - private void assertStateRestoreListenerOnStartNotification(final MockStateRestoreListener restoreListener) { - assertTrue(restoreListener.storeNameCalledStates.containsKey(RESTORE_START)); - assertThat(restoreListener.restoreTopicPartition, is(topicPartition)); - assertThat(restoreListener.restoreStartOffset, is(startOffset)); - assertThat(restoreListener.restoreEndOffset, is(endOffset)); - } - - private void assertStateRestoreListenerOnBatchCompleteNotification(final MockStateRestoreListener restoreListener) { - assertTrue(restoreListener.storeNameCalledStates.containsKey(RESTORE_BATCH)); - assertThat(restoreListener.restoreTopicPartition, is(topicPartition)); - assertThat(restoreListener.restoredBatchOffset, is(batchOffset)); - assertThat(restoreListener.numBatchRestored, is(numberRestored)); - } - - private void assertStateRestoreOnEndNotification(final MockStateRestoreListener restoreListener) { - assertTrue(restoreListener.storeNameCalledStates.containsKey(RESTORE_END)); - assertThat(restoreListener.restoreTopicPartition, is(topicPartition)); - assertThat(restoreListener.totalNumRestored, is(numberRestored)); - } - - - private void setUpCompositeRestoreListener(final StateRestoreCallback stateRestoreCallback) { - compositeRestoreListener = new CompositeRestoreListener(stateRestoreCallback); - compositeRestoreListener.setUserRestoreListener(reportingStoreListener); - } - - - private static class MockStateRestoreCallback extends MockStateRestoreListener implements StateRestoreCallback { - - byte[] restoredKey; - byte[] restoredValue; - - @Override - public void restore(final byte[] key, final byte[] value) { - restoredKey = key; - restoredValue = value; - } - } - - private static class MockNoListenBatchingStateRestoreCallback implements BatchingStateRestoreCallback { - - Collection<KeyValue<byte[], byte[]>> restoredRecords; - - @Override - public void restoreAll(final Collection<KeyValue<byte[], byte[]>> records) { - restoredRecords = records; - } - - @Override - public void restore(final byte[] key, final byte[] value) { - throw new IllegalStateException("Should not be called"); - - } - } - -} diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockChangelogReader.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockChangelogReader.java index ad7dad6..0bba6aa 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockChangelogReader.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockChangelogReader.java @@ -64,7 +64,7 @@ public class MockChangelogReader implements ChangelogReader { } @Override - public void unregister(final Collection<TopicPartition> partitions, final boolean triggerOnRestoreEnd) { + public void unregister(final Collection<TopicPartition> partitions) { restoringPartitions.removeAll(partitions); for (final TopicPartition partition : partitions) { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java index fdfb3c2..77ed3a4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java @@ -33,8 +33,8 @@ import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; import org.apache.kafka.streams.processor.internals.ProcessorStateManager.StateStoreMetadata; import org.apache.kafka.streams.state.TimestampedBytesStore; import org.apache.kafka.streams.state.internals.OffsetCheckpoint; -import org.apache.kafka.test.MockBatchingStateRestoreListener; import org.apache.kafka.test.MockKeyValueStore; +import org.apache.kafka.test.MockRestoreCallback; import org.apache.kafka.test.TestUtils; import org.easymock.EasyMock; import org.easymock.EasyMockRunner; @@ -51,7 +51,6 @@ import java.io.File; import java.io.FileWriter; import java.io.IOException; import java.nio.charset.StandardCharsets; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -212,21 +211,21 @@ public class ProcessorStateManagerTest { @Test public void shouldRestoreStoreWithRestoreCallback() { - final MockBatchingStateRestoreListener batchingRestoreCallback = new MockBatchingStateRestoreListener(); + final MockRestoreCallback restoreCallback = new MockRestoreCallback(); final KeyValue<byte[], byte[]> expectedKeyValue = KeyValue.pair(keyBytes, valueBytes); final ProcessorStateManager stateMgr = getStateManager(Task.TaskType.ACTIVE); try { - stateMgr.registerStore(persistentStore, batchingRestoreCallback); + stateMgr.registerStore(persistentStore, restoreCallback); final StateStoreMetadata storeMetadata = stateMgr.storeMetadata(persistentStorePartition); assertThat(storeMetadata, notNullValue()); stateMgr.restore(storeMetadata, singletonList(consumerRecord)); - assertThat(batchingRestoreCallback.getRestoredRecords().size(), is(1)); - assertTrue(batchingRestoreCallback.getRestoredRecords().contains(expectedKeyValue)); + assertThat(restoreCallback.restored.size(), is(1)); + assertTrue(restoreCallback.restored.contains(expectedKeyValue)); assertEquals(Collections.singletonMap(persistentStorePartition, 101L), stateMgr.changelogOffsets()); } finally { @@ -714,11 +713,8 @@ public class ProcessorStateManagerTest { public void shouldThrowIfRestoreCallbackThrows() { final ProcessorStateManager stateMgr = getStateManager(Task.TaskType.ACTIVE); - stateMgr.registerStore(persistentStore, new MockBatchingStateRestoreListener() { - @Override - public void restoreAll(final Collection<KeyValue<byte[], byte[]>> records) { - throw new RuntimeException("KABOOM!"); - } + stateMgr.registerStore(persistentStore, (key, value) -> { + throw new RuntimeException("KABOOM!"); }); final StateStoreMetadata storeMetadata = stateMgr.storeMetadata(persistentStorePartition); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java index 6956e79..6769f4a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java @@ -31,7 +31,6 @@ import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.internals.ProcessorStateManager.StateStoreMetadata; import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; -import org.apache.kafka.test.MockBatchingStateRestoreListener; import org.apache.kafka.test.MockStateRestoreListener; import org.apache.kafka.test.StreamsTestUtils; import org.easymock.EasyMock; @@ -114,11 +113,6 @@ public class StoreChangelogReaderTest extends EasyMockSupport { private final KafkaException kaboom = new KafkaException("KABOOM!"); private final MockStateRestoreListener exceptionCallback = new MockStateRestoreListener() { @Override - public void restore(final byte[] key, final byte[] value) { - throw kaboom; - } - - @Override public void onRestoreStart(final TopicPartition tp, final String store, final long stOffset, final long edOffset) { throw kaboom; } @@ -897,7 +891,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport { assertEquals(StoreChangelogReader.ChangelogState.RESTORING, changelogReader.changelogMetadata(tp2).state()); // should support removing and clearing changelogs - changelogReader.unregister(Collections.singletonList(tp), false); + changelogReader.unregister(Collections.singletonList(tp)); assertNull(changelogReader.changelogMetadata(tp)); assertFalse(changelogReader.isEmpty()); assertEquals(StoreChangelogReader.ChangelogState.RESTORING, changelogReader.changelogMetadata(tp1).state()); @@ -910,79 +904,6 @@ public class StoreChangelogReaderTest extends EasyMockSupport { } @Test - public void shouldTriggerRestoreCallbackAsListener() { - // do not need this test for standby task - if (type == STANDBY) - return; - - final MockBatchingStateRestoreListener restoreListener = new MockBatchingStateRestoreListener(); - EasyMock.expect(storeMetadata.restoreCallback()).andReturn(restoreListener).anyTimes(); - EasyMock.expect(storeMetadata.offset()).andReturn(null).andReturn(9L).anyTimes(); - EasyMock.replay(stateManager, storeMetadata, store); - - final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) { - @Override - public Map<TopicPartition, Long> endOffsets(final Collection<TopicPartition> partitions) { - return partitions.stream().collect(Collectors.toMap(Function.identity(), partition -> 11L)); - } - }; - consumer.updateBeginningOffsets(Collections.singletonMap(tp, 5L)); - - final StoreChangelogReader changelogReader = - new StoreChangelogReader(time, config, logContext, consumer, callback); - - changelogReader.register(tp, stateManager); - - changelogReader.restore(); - - assertEquals(StoreChangelogReader.ChangelogState.RESTORING, changelogReader.changelogMetadata(tp).state()); - assertEquals(0L, changelogReader.changelogMetadata(tp).totalRestored()); - assertEquals(5L, consumer.position(tp)); - assertEquals(Collections.emptySet(), consumer.paused()); - - assertEquals(11L, (long) changelogReader.changelogMetadata(tp).endOffset()); - - assertEquals(tp, callback.restoreTopicPartition); - assertEquals(storeName, callback.storeNameCalledStates.get(RESTORE_START)); - assertNull(callback.storeNameCalledStates.get(RESTORE_END)); - assertNull(callback.storeNameCalledStates.get(RESTORE_BATCH)); - assertEquals(5L, restoreListener.restoreStartOffset); - assertEquals(11L, restoreListener.restoreEndOffset); - assertEquals(storeName, restoreListener.storeNameCalledStates.get(RESTORE_START)); - - consumer.addRecord(new ConsumerRecord<>(topicName, 0, 6L, "key".getBytes(), "value".getBytes())); - consumer.addRecord(new ConsumerRecord<>(topicName, 0, 7L, "key".getBytes(), "value".getBytes())); - // null key should be ignored - consumer.addRecord(new ConsumerRecord<>(topicName, 0, 8L, null, "value".getBytes())); - consumer.addRecord(new ConsumerRecord<>(topicName, 0, 9L, "key".getBytes(), "value".getBytes())); - - changelogReader.restore(); - - assertEquals(StoreChangelogReader.ChangelogState.RESTORING, changelogReader.changelogMetadata(tp).state()); - assertEquals(3L, changelogReader.changelogMetadata(tp).totalRestored()); - assertEquals(0, changelogReader.changelogMetadata(tp).bufferedRecords().size()); - assertEquals(0, changelogReader.changelogMetadata(tp).bufferedLimitIndex()); - assertEquals(storeName, restoreListener.storeNameCalledStates.get(RESTORE_BATCH)); - - // consumer position bypassing the gap in the next poll - consumer.seek(tp, 11L); - - changelogReader.restore(); - - assertEquals(11L, consumer.position(tp)); - assertEquals(3L, changelogReader.changelogMetadata(tp).totalRestored()); - - assertEquals(StoreChangelogReader.ChangelogState.COMPLETED, changelogReader.changelogMetadata(tp).state()); - assertEquals(3L, changelogReader.changelogMetadata(tp).totalRestored()); - assertEquals(Collections.singleton(tp), changelogReader.completedChangelogs()); - assertEquals(Collections.singleton(tp), consumer.paused()); - - assertEquals(storeName, callback.storeNameCalledStates.get(RESTORE_BATCH)); - assertEquals(storeName, callback.storeNameCalledStates.get(RESTORE_END)); - assertEquals(storeName, restoreListener.storeNameCalledStates.get(RESTORE_END)); - } - - @Test public void shouldTransitState() { EasyMock.expect(storeMetadataOne.changelogPartition()).andReturn(tp1).anyTimes(); EasyMock.expect(storeMetadataOne.store()).andReturn(store).anyTimes(); @@ -1038,7 +959,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport { // transition to update standby is NOT idempotent assertThrows(IllegalStateException.class, changelogReader::transitToUpdateStandby); - changelogReader.unregister(Collections.singletonList(tp), false); + changelogReader.unregister(Collections.singletonList(tp)); changelogReader.register(tp, activeStateManager); // if a new active is registered, we should immediately transit to standby updating @@ -1092,7 +1013,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport { public void shouldNotThrowOnUnknownRevokedPartition() { LogCaptureAppender.setClassLoggerToDebug(StoreChangelogReader.class); try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StoreChangelogReader.class)) { - changelogReader.unregister(Collections.singletonList(new TopicPartition("unknown", 0)), false); + changelogReader.unregister(Collections.singletonList(new TopicPartition("unknown", 0))); assertThat( appender.getMessages(), diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java index 5b0081c..4b3f9d5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java @@ -30,7 +30,6 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.Window; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.SessionWindow; -import org.apache.kafka.streams.processor.StateRestoreListener; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.processor.internals.Task.TaskType; import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; @@ -47,7 +46,6 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameter; import org.junit.runners.Parameterized.Parameters; -import org.rocksdb.Options; import org.rocksdb.WriteBatch; import java.io.File; @@ -144,8 +142,6 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment> abstract AbstractSegments<S> newSegments(); - abstract Options getOptions(S segment); - @Test public void shouldPutAndFetch() { final String key = "a"; @@ -381,11 +377,6 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment> // 2 segments are created during restoration. assertEquals(2, bytesStore.getSegments().size()); - // Bulk loading is disabled during recovery for stand-by tasks. - for (final S segment : bytesStore.getSegments()) { - assertThat(getOptions(segment).level0FileNumCompactionTrigger(), equalTo(taskType == TaskType.ACTIVE ? 1 << 30 : 4)); - } - final List<KeyValue<Windowed<String>, Long>> expected = new ArrayList<>(); expected.add(new KeyValue<>(new Windowed<>(key, windows[0]), 50L)); expected.add(new KeyValue<>(new Windowed<>(key, windows[3]), 100L)); @@ -395,28 +386,6 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment> } @Test - public void shouldRespectBulkLoadOptionsDuringInit() { - bytesStore.init(context, bytesStore); - final String key = "a"; - bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), serializeValue(50L)); - bytesStore.put(serializeKey(new Windowed<>(key, windows[3])), serializeValue(100L)); - assertEquals(2, bytesStore.getSegments().size()); - - final StateRestoreListener restoreListener = context.getRestoreListener(bytesStore.name()); - - restoreListener.onRestoreStart(null, bytesStore.name(), 0L, 0L); - - for (final S segment : bytesStore.getSegments()) { - assertThat(getOptions(segment).level0FileNumCompactionTrigger(), equalTo(1 << 30)); - } - - restoreListener.onRestoreEnd(null, bytesStore.name(), 0L); - for (final S segment : bytesStore.getSegments()) { - assertThat(getOptions(segment).level0FileNumCompactionTrigger(), equalTo(4)); - } - } - - @Test public void shouldLogAndMeasureExpiredRecordsWithBuiltInMetricsVersionLatest() { shouldLogAndMeasureExpiredRecords(StreamsConfig.METRICS_LATEST); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java index b9e49b2..3b6904f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java @@ -16,8 +16,6 @@ */ package org.apache.kafka.streams.state.internals; -import org.rocksdb.Options; - public class RocksDBSegmentedBytesStoreTest extends AbstractRocksDBSegmentedBytesStoreTest<KeyValueSegment> { private final static String METRICS_SCOPE = "metrics-scope"; @@ -37,9 +35,4 @@ public class RocksDBSegmentedBytesStoreTest extends AbstractRocksDBSegmentedByte KeyValueSegments newSegments() { return new KeyValueSegments(storeName, METRICS_SCOPE, retention, segmentInterval); } - - @Override - Options getOptions(final KeyValueSegment segment) { - return segment.getOptions(); - } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java index f61b9c6..c86b914 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java @@ -33,7 +33,6 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.ProcessorStateException; -import org.apache.kafka.streams.processor.StateRestoreListener; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.KeyValueIterator; @@ -238,25 +237,6 @@ public class RocksDBStoreTest { } @Test - public void shouldRespectBulkloadOptionsDuringInit() { - rocksDBStore.init(context, rocksDBStore); - - final StateRestoreListener restoreListener = context.getRestoreListener(rocksDBStore.name()); - - restoreListener.onRestoreStart(null, rocksDBStore.name(), 0L, 0L); - - assertThat(rocksDBStore.getOptions().level0FileNumCompactionTrigger(), equalTo(1 << 30)); - assertThat(rocksDBStore.getOptions().level0SlowdownWritesTrigger(), equalTo(1 << 30)); - assertThat(rocksDBStore.getOptions().level0StopWritesTrigger(), equalTo(1 << 30)); - - restoreListener.onRestoreEnd(null, rocksDBStore.name(), 0L); - - assertThat(rocksDBStore.getOptions().level0FileNumCompactionTrigger(), equalTo(10)); - assertThat(rocksDBStore.getOptions().level0SlowdownWritesTrigger(), equalTo(20)); - assertThat(rocksDBStore.getOptions().level0StopWritesTrigger(), equalTo(36)); - } - - @Test public void shouldNotThrowExceptionOnRestoreWhenThereIsPreExistingRocksDbFiles() { rocksDBStore.init(context, rocksDBStore); rocksDBStore.put(new Bytes("existingKey".getBytes(UTF_8)), "existingValue".getBytes(UTF_8)); @@ -331,36 +311,6 @@ public class RocksDBStoreTest { } @Test - public void shouldTogglePrepareForBulkloadSetting() { - rocksDBStore.init(context, rocksDBStore); - final RocksDBStore.RocksDBBatchingRestoreCallback restoreListener = - (RocksDBStore.RocksDBBatchingRestoreCallback) rocksDBStore.batchingStateRestoreCallback; - - restoreListener.onRestoreStart(null, null, 0, 0); - assertTrue("Should have set bulk loading to true", rocksDBStore.isPrepareForBulkload()); - - restoreListener.onRestoreEnd(null, null, 0); - assertFalse("Should have set bulk loading to false", rocksDBStore.isPrepareForBulkload()); - } - - @Test - public void shouldTogglePrepareForBulkloadSettingWhenPrexistingSstFiles() { - final List<KeyValue<byte[], byte[]>> entries = getKeyValueEntries(); - - rocksDBStore.init(context, rocksDBStore); - context.restore(rocksDBStore.name(), entries); - - final RocksDBStore.RocksDBBatchingRestoreCallback restoreListener = - (RocksDBStore.RocksDBBatchingRestoreCallback) rocksDBStore.batchingStateRestoreCallback; - - restoreListener.onRestoreStart(null, null, 0, 0); - assertTrue("Should have not set bulk loading to true", rocksDBStore.isPrepareForBulkload()); - - restoreListener.onRestoreEnd(null, null, 0); - assertFalse("Should have set bulk loading to false", rocksDBStore.isPrepareForBulkload()); - } - - @Test public void shouldRestoreAll() { final List<KeyValue<byte[], byte[]>> entries = getKeyValueEntries(); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedSegmentedBytesStoreTest.java index 01510ff..814a04c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedSegmentedBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedSegmentedBytesStoreTest.java @@ -16,8 +16,6 @@ */ package org.apache.kafka.streams.state.internals; -import org.rocksdb.Options; - public class RocksDBTimestampedSegmentedBytesStoreTest extends AbstractRocksDBSegmentedBytesStoreTest<TimestampedSegment> { @@ -37,9 +35,4 @@ public class RocksDBTimestampedSegmentedBytesStoreTest TimestampedSegments newSegments() { return new TimestampedSegments(storeName, METRICS_SCOPE, retention, segmentInterval); } - - @Override - Options getOptions(final TimestampedSegment segment) { - return segment.getOptions(); - } } diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java index ed5d943..f1fd916 100644 --- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java @@ -29,12 +29,10 @@ import org.apache.kafka.streams.processor.Cancellable; import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.StateRestoreCallback; -import org.apache.kafka.streams.processor.StateRestoreListener; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.To; import org.apache.kafka.streams.processor.internals.AbstractProcessorContext; -import org.apache.kafka.streams.processor.internals.CompositeRestoreListener; import org.apache.kafka.streams.processor.internals.ProcessorNode; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback; @@ -393,31 +391,13 @@ public class InternalMockProcessorContext cache().addDirtyEntryFlushListener(namespace, listener); } - public StateRestoreListener getRestoreListener(final String storeName) { - return getStateRestoreListener(restoreFuncs.get(storeName)); - } - public void restore(final String storeName, final Iterable<KeyValue<byte[], byte[]>> changeLog) { final RecordBatchingStateRestoreCallback restoreCallback = adapt(restoreFuncs.get(storeName)); - final StateRestoreListener restoreListener = getRestoreListener(storeName); - - restoreListener.onRestoreStart(null, storeName, 0L, 0L); final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>(); for (final KeyValue<byte[], byte[]> keyValue : changeLog) { records.add(new ConsumerRecord<>("", 0, 0L, keyValue.key, keyValue.value)); } - restoreCallback.restoreBatch(records); - - restoreListener.onRestoreEnd(null, storeName, 0L); - } - - private StateRestoreListener getStateRestoreListener(final StateRestoreCallback restoreCallback) { - if (restoreCallback instanceof StateRestoreListener) { - return (StateRestoreListener) restoreCallback; - } - - return CompositeRestoreListener.NO_OP_STATE_RESTORE_LISTENER; } } diff --git a/streams/src/test/java/org/apache/kafka/test/MockBatchingStateRestoreListener.java b/streams/src/test/java/org/apache/kafka/test/MockBatchingStateRestoreListener.java deleted file mode 100644 index 1736a54..0000000 --- a/streams/src/test/java/org/apache/kafka/test/MockBatchingStateRestoreListener.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.test; - -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.processor.BatchingStateRestoreCallback; - -import java.util.ArrayList; -import java.util.Collection; - -public class MockBatchingStateRestoreListener extends MockStateRestoreListener implements BatchingStateRestoreCallback { - - private final Collection<KeyValue<byte[], byte[]>> restoredRecords = new ArrayList<>(); - - @Override - public void restoreAll(final Collection<KeyValue<byte[], byte[]>> records) { - restoredRecords.addAll(records); - } - - @Override - public void restore(final byte[] key, final byte[] value) { - throw new IllegalStateException("Should not be called"); - - } - - public Collection<KeyValue<byte[], byte[]>> getRestoredRecords() { - return restoredRecords; - } -} diff --git a/streams/src/test/java/org/apache/kafka/test/MockRestoreCallback.java b/streams/src/test/java/org/apache/kafka/test/MockRestoreCallback.java index 096fa11..fa5b465 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockRestoreCallback.java +++ b/streams/src/test/java/org/apache/kafka/test/MockRestoreCallback.java @@ -25,7 +25,6 @@ import java.util.List; public class MockRestoreCallback implements StateRestoreCallback { public List<KeyValue<byte[], byte[]>> restored = new ArrayList<>(); - @Override public void restore(final byte[] key, final byte[] value) { restored.add(KeyValue.pair(key, value)); diff --git a/streams/src/test/java/org/apache/kafka/test/MockStateRestoreListener.java b/streams/src/test/java/org/apache/kafka/test/MockStateRestoreListener.java index 3138167..1026969 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockStateRestoreListener.java +++ b/streams/src/test/java/org/apache/kafka/test/MockStateRestoreListener.java @@ -18,19 +18,15 @@ package org.apache.kafka.test; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.processor.AbstractNotifyingRestoreCallback; +import org.apache.kafka.streams.processor.StateRestoreListener; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; -public class MockStateRestoreListener extends AbstractNotifyingRestoreCallback { +public class MockStateRestoreListener implements StateRestoreListener { // verifies store name called for each state public final Map<String, String> storeNameCalledStates = new HashMap<>(); - public final List<KeyValue<byte[], byte[]>> restored = new ArrayList<>(); public long restoreStartOffset; public long restoreEndOffset; public long restoredBatchOffset; @@ -43,11 +39,6 @@ public class MockStateRestoreListener extends AbstractNotifyingRestoreCallback { public static final String RESTORE_END = "restore_end"; @Override - public void restore(final byte[] key, final byte[] value) { - restored.add(KeyValue.pair(key, value)); - } - - @Override public void onRestoreStart(final TopicPartition topicPartition, final String storeName, final long startingOffset, @@ -82,7 +73,6 @@ public class MockStateRestoreListener extends AbstractNotifyingRestoreCallback { public String toString() { return "MockStateRestoreListener{" + "storeNameCalledStates=" + storeNameCalledStates + - ", restored=" + restored + ", restoreStartOffset=" + restoreStartOffset + ", restoreEndOffset=" + restoreEndOffset + ", restoredBatchOffset=" + restoredBatchOffset +