KAFKA-5949; User Callback Exceptions need to be handled properly - catch user exception in user callback (TimestampExtractor, DeserializationHandler, StateRestoreListener) and wrap with StreamsException
Additional cleanup: - rename globalRestoreListener to userRestoreListener - remove unnecessary interface -> collapse SourceNodeRecordDeserializer and RecordDeserializer - removed unused parameter loggingEnabled from ProcessorContext#register Author: Matthias J. Sax <[email protected]> Reviewers: Bill Bejeck <[email protected]>, Guozhang Wang <[email protected]>, Damian Guy <[email protected]> Closes #3939 from mjsax/kafka-5949-exceptions-user-callbacks Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e5f2471c Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e5f2471c Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e5f2471c Branch: refs/heads/trunk Commit: e5f2471c548fc490a42dd0321bcf7fcdd4ddc52d Parents: 2703fda Author: Matthias J. Sax <[email protected]> Authored: Thu Sep 28 11:00:31 2017 +0100 Committer: Damian Guy <[email protected]> Committed: Thu Sep 28 11:00:31 2017 +0100 ---------------------------------------------------------------------- .../errors/LogAndContinueExceptionHandler.java | 6 +- .../errors/LogAndFailExceptionHandler.java | 8 +- .../kafka/streams/kstream/ValueTransformer.java | 2 + .../internals/KStreamTransformValues.java | 6 +- .../streams/processor/ProcessorContext.java | 11 +- .../kafka/streams/processor/StateStore.java | 4 + .../internals/AbstractProcessorContext.java | 4 +- .../processor/internals/AbstractTask.java | 6 +- .../processor/internals/AssignedTasks.java | 5 + .../internals/CompositeRestoreListener.java | 52 +++++++-- .../processor/internals/GlobalStateManager.java | 6 + .../internals/GlobalStateManagerImpl.java | 1 - .../internals/GlobalStateUpdateTask.java | 21 ++-- .../processor/internals/GlobalStreamThread.java | 7 +- .../internals/InternalTopologyBuilder.java | 8 +- .../internals/ProcessorStateManager.java | 13 +-- .../processor/internals/RecordDeserializer.java | 70 +++++++++++- .../processor/internals/RecordQueue.java | 28 +++-- .../internals/SourceNodeRecordDeserializer.java | 90 --------------- .../processor/internals/StateManager.java | 8 +- .../processor/internals/StateRestorer.java | 4 +- .../internals/StoreChangelogReader.java | 8 +- .../streams/processor/internals/StreamTask.java | 3 +- .../processor/internals/StreamThread.java | 12 +- .../kafka/streams/processor/internals/Task.java | 3 + .../processor/internals/TaskManager.java | 5 +- .../state/internals/InMemoryKeyValueStore.java | 2 +- .../streams/state/internals/MemoryLRUCache.java | 2 +- .../streams/processor/TopologyBuilderTest.java | 2 +- .../internals/CompositeRestoreListenerTest.java | 8 +- .../internals/GlobalStateManagerImplTest.java | 42 +++---- .../internals/GlobalStateTaskTest.java | 20 +++- .../internals/InternalTopologyBuilderTest.java | 3 + .../processor/internals/PartitionGroupTest.java | 18 ++- .../internals/ProcessorStateManagerTest.java | 50 ++++----- .../internals/RecordDeserializerTest.java | 98 ++++++++++++++++ .../processor/internals/RecordQueueTest.java | 32 ++++-- .../SourceNodeRecordDeserializerTest.java | 111 ------------------- .../processor/internals/StandbyTaskTest.java | 2 +- .../processor/internals/StateManagerStub.java | 2 +- .../processor/internals/StateRestorerTest.java | 2 +- .../internals/StoreChangelogReaderTest.java | 2 +- .../internals/StreamPartitionAssignorTest.java | 15 ++- .../processor/internals/StreamTaskTest.java | 4 +- .../kafka/test/GlobalStateManagerStub.java | 2 +- .../apache/kafka/test/MockProcessorContext.java | 4 +- .../kafka/test/MockStateStoreSupplier.java | 39 +++---- .../apache/kafka/test/NoOpProcessorContext.java | 4 +- .../kafka/test/ProcessorTopologyTestDriver.java | 5 +- 49 files changed, 478 insertions(+), 382 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java index dde4b52..b2ef45b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java @@ -38,9 +38,9 @@ public class LogAndContinueExceptionHandler implements DeserializationExceptionH final Exception exception) { log.warn("Exception caught during Deserialization, " + - "taskId: {}, topic: {}, partition: {}, offset: {}", - context.taskId(), record.topic(), record.partition(), record.offset(), - exception); + "taskId: {}, topic: {}, partition: {}, offset: {}", + context.taskId(), record.topic(), record.partition(), record.offset(), + exception); return DeserializationHandlerResponse.CONTINUE; } http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java index 23557a3..60af32f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java @@ -37,10 +37,10 @@ public class LogAndFailExceptionHandler implements DeserializationExceptionHandl final ConsumerRecord<byte[], byte[]> record, final Exception exception) { - log.warn("Exception caught during Deserialization, " + - "taskId: {}, topic: {}, partition: {}, offset: {}", - context.taskId(), record.topic(), record.partition(), record.offset(), - exception); + log.error("Exception caught during Deserialization, " + + "taskId: {}, topic: {}, partition: {}, offset: {}", + context.taskId(), record.topic(), record.partition(), record.offset(), + exception); return DeserializationHandlerResponse.FAIL; } http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java index 5463a76..0a8e890 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java @@ -62,6 +62,8 @@ public interface ValueTransformer<V, VR> { * {@code ValueTransformer} and will result in an {@link StreamsException exception}. * * @param context the context + * @throws IllegalStateException If store gets registered after initialization is already finished + * @throws StreamsException if the store's change log does not contain the partition */ void init(final ProcessorContext context); http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java index ab1c302..55c16cc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java @@ -91,8 +91,10 @@ public class KStreamTransformValues<K, V, R> implements ProcessorSupplier<K, V> } @Override - public void register(final StateStore store, final boolean loggingEnabled, final StateRestoreCallback stateRestoreCallback) { - context.register(store, loggingEnabled, stateRestoreCallback); + public void register(final StateStore store, + final boolean deprecatedAndIgnoredLoggingEnabled, + final StateRestoreCallback stateRestoreCallback) { + context.register(store, deprecatedAndIgnoredLoggingEnabled, stateRestoreCallback); } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java index cdf1612..385d641 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor; import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.StreamsMetrics; +import org.apache.kafka.streams.errors.StreamsException; import java.io.File; import java.util.Map; @@ -75,8 +76,14 @@ public interface ProcessorContext { * Registers and possibly restores the specified storage engine. * * @param store the storage engine - */ - void register(StateStore store, boolean loggingEnabled, StateRestoreCallback stateRestoreCallback); + * @param loggingEnabledIsDeprecatedAndIgnored deprecated parameter {@code loggingEnabled} is ignored: + * if you want to enable logging on a state stores call + * {@link org.apache.kafka.streams.state.StoreBuilder#withLoggingEnabled(Map)} + * when creating the store + * @throws IllegalStateException If store gets registered after initialized is already finished + * @throws StreamsException if the store's change log does not contain the partition + */ + void register(StateStore store, boolean loggingEnabledIsDeprecatedAndIgnored, StateRestoreCallback stateRestoreCallback); /** * Get the state store given the store name. http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java index 3925951..cb8139c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.streams.processor; +import org.apache.kafka.streams.errors.StreamsException; + /** * A storage engine for managing state maintained by a stream processor. * @@ -36,6 +38,8 @@ public interface StateStore { /** * Initializes this state store + * @throws IllegalStateException If store gets registered after initialized is already finished + * @throws StreamsException if the store's change log does not contain the partition */ void init(ProcessorContext context, StateStore root); http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java index 9e853fd..410212e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java @@ -93,13 +93,13 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte @Override public void register(final StateStore store, - final boolean loggingEnabled, + final boolean deprecatedAndIgnoredLoggingEnabled, final StateRestoreCallback stateRestoreCallback) { if (initialized) { throw new IllegalStateException("Can only create state stores during initialization."); } Objects.requireNonNull(store, "store must not be null"); - stateManager.register(store, loggingEnabled, stateRestoreCallback); + stateManager.register(store, stateRestoreCallback); } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index 6734da6..c24686e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -90,7 +90,7 @@ public abstract class AbstractTask implements Task { topology.storeToChangelogTopic(), changelogReader, eosEnabled, - logContext); + logContext); } catch (final IOException e) { throw new ProcessorStateException(String.format("%sError while creating the state manager", logPrefix), e); } @@ -196,6 +196,10 @@ public abstract class AbstractTask implements Task { stateMgr.flush(); } + /** + * @throws IllegalStateException If store gets registered after initialized is already finished + * @throws StreamsException if the store's change log does not contain the partition + */ void initializeStateStores() { if (topology.stateStores().isEmpty()) { return; http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java index e51ebd7..fcb717d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.errors.LockException; +import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.TaskId; import org.slf4j.Logger; @@ -107,6 +108,10 @@ class AssignedTasks { return partitions; } + /** + * @throws IllegalStateException If store gets registered after initialized is already finished + * @throws StreamsException if the store's change log does not contain the partition + */ void initializeNewTasks() { if (!created.isEmpty()) { log.debug("Initializing {}s {}", taskTypeName, created.keySet()); http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListener.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListener.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListener.java index 138be77..a1c2f7f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListener.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListener.java @@ -20,6 +20,7 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.AbstractNotifyingBatchingRestoreCallback; import org.apache.kafka.streams.processor.BatchingStateRestoreCallback; import org.apache.kafka.streams.processor.StateRestoreCallback; @@ -32,7 +33,7 @@ public class CompositeRestoreListener implements BatchingStateRestoreCallback, S public static final NoOpStateRestoreListener NO_OP_STATE_RESTORE_LISTENER = new NoOpStateRestoreListener(); private final BatchingStateRestoreCallback internalBatchingRestoreCallback; private final StateRestoreListener storeRestoreListener; - private StateRestoreListener globalRestoreListener = NO_OP_STATE_RESTORE_LISTENER; + private StateRestoreListener userRestoreListener = NO_OP_STATE_RESTORE_LISTENER; CompositeRestoreListener(final StateRestoreCallback stateRestoreCallback) { @@ -45,31 +46,66 @@ public class CompositeRestoreListener implements BatchingStateRestoreCallback, S internalBatchingRestoreCallback = getBatchingRestoreCallback(stateRestoreCallback); } + /** + * @throws StreamsException if user provided {@link StateRestoreListener} raises an exception in + * {@link StateRestoreListener#onRestoreStart(TopicPartition, String, long, long)} + */ @Override public void onRestoreStart(final TopicPartition topicPartition, final String storeName, final long startingOffset, final long endingOffset) { - globalRestoreListener.onRestoreStart(topicPartition, storeName, startingOffset, endingOffset); + try { + userRestoreListener.onRestoreStart(topicPartition, storeName, startingOffset, endingOffset); + } catch (final Exception fatalUserException) { + throw new StreamsException( + String.format("Fatal user code error in store restore listener for store %s, partition %s.", + storeName, + topicPartition), + fatalUserException); + } storeRestoreListener.onRestoreStart(topicPartition, storeName, startingOffset, endingOffset); } + /** + * @throws StreamsException if user provided {@link StateRestoreListener} raises an exception in + * {@link StateRestoreListener#onBatchRestored(TopicPartition, String, long, long)} + */ @Override public void onBatchRestored(final TopicPartition topicPartition, final String storeName, final long batchEndOffset, final long numRestored) { - globalRestoreListener.onBatchRestored(topicPartition, storeName, batchEndOffset, numRestored); + try { + userRestoreListener.onBatchRestored(topicPartition, storeName, batchEndOffset, numRestored); + } catch (final Exception fatalUserException) { + throw new StreamsException( + String.format("Fatal user code error in store restore listener for store %s, partition %s.", + storeName, + topicPartition), + fatalUserException); + } storeRestoreListener.onBatchRestored(topicPartition, storeName, batchEndOffset, numRestored); } + /** + * @throws StreamsException if user provided {@link StateRestoreListener} raises an exception in + * {@link StateRestoreListener#onRestoreEnd(TopicPartition, String, long)} + */ @Override public void onRestoreEnd(final TopicPartition topicPartition, final String storeName, final long totalRestored) { - globalRestoreListener.onRestoreEnd(topicPartition, storeName, totalRestored); + try { + userRestoreListener.onRestoreEnd(topicPartition, storeName, totalRestored); + } catch (final Exception fatalUserException) { + throw new StreamsException( + String.format("Fatal user code error in store restore listener for store %s, partition %s.", + storeName, + topicPartition), + fatalUserException); + } storeRestoreListener.onRestoreEnd(topicPartition, storeName, totalRestored); - } @Override @@ -77,9 +113,9 @@ public class CompositeRestoreListener implements BatchingStateRestoreCallback, S internalBatchingRestoreCallback.restoreAll(records); } - void setGlobalRestoreListener(final StateRestoreListener globalRestoreListener) { - if (globalRestoreListener != null) { - this.globalRestoreListener = globalRestoreListener; + void setUserRestoreListener(final StateRestoreListener userRestoreListener) { + if (userRestoreListener != null) { + this.userRestoreListener = userRestoreListener; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManager.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManager.java index b058844..c9b8ca8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManager.java @@ -16,8 +16,14 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.streams.errors.StreamsException; + import java.util.Set; public interface GlobalStateManager extends StateManager { + /** + * @throws IllegalStateException If store gets registered after initialized is already finished + * @throws StreamsException if the store's change log does not contain the partition + */ Set<String> initialize(InternalProcessorContext processorContext); } http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java index d03425b..10a0775 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java @@ -120,7 +120,6 @@ public class GlobalStateManagerImpl implements GlobalStateManager { } public void register(final StateStore store, - final boolean ignored, final StateRestoreCallback stateRestoreCallback) { if (stores.containsKey(store.name())) { http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java index 4c2b40f..849af57 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java @@ -18,7 +18,9 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.errors.DeserializationExceptionHandler; +import org.apache.kafka.streams.errors.StreamsException; import java.io.IOException; import java.util.HashMap; @@ -33,29 +35,34 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer { private final ProcessorTopology topology; private final InternalProcessorContext processorContext; private final Map<TopicPartition, Long> offsets = new HashMap<>(); - private final Map<String, SourceNodeRecordDeserializer> deserializers = new HashMap<>(); + private final Map<String, RecordDeserializer> deserializers = new HashMap<>(); private final GlobalStateManager stateMgr; private final DeserializationExceptionHandler deserializationExceptionHandler; - + private final LogContext logContext; public GlobalStateUpdateTask(final ProcessorTopology topology, final InternalProcessorContext processorContext, final GlobalStateManager stateMgr, - final DeserializationExceptionHandler deserializationExceptionHandler) { - + final DeserializationExceptionHandler deserializationExceptionHandler, + final LogContext logContext) { this.topology = topology; this.stateMgr = stateMgr; this.processorContext = processorContext; this.deserializationExceptionHandler = deserializationExceptionHandler; + this.logContext = logContext; } + /** + * @throws IllegalStateException If store gets registered after initialized is already finished + * @throws StreamsException if the store's change log does not contain the partition + */ public Map<TopicPartition, Long> initialize() { final Set<String> storeNames = stateMgr.initialize(processorContext); final Map<String, String> storeNameToTopic = topology.storeToChangelogTopic(); for (final String storeName : storeNames) { final String sourceTopic = storeNameToTopic.get(storeName); final SourceNode source = topology.source(sourceTopic); - deserializers.put(sourceTopic, new SourceNodeRecordDeserializer(source, deserializationExceptionHandler)); + deserializers.put(sourceTopic, new RecordDeserializer(source, deserializationExceptionHandler, logContext)); } initTopology(); processorContext.initialized(); @@ -66,8 +73,8 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer { @SuppressWarnings("unchecked") @Override public void update(final ConsumerRecord<byte[], byte[]> record) { - final SourceNodeRecordDeserializer sourceNodeAndDeserializer = deserializers.get(record.topic()); - final ConsumerRecord<Object, Object> deserialized = sourceNodeAndDeserializer.tryDeserialize(processorContext, record); + final RecordDeserializer sourceNodeAndDeserializer = deserializers.get(record.topic()); + final ConsumerRecord<Object, Object> deserialized = sourceNodeAndDeserializer.deserialize(processorContext, record); if (deserialized != null) { final ProcessorRecordContext recordContext = http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java index a365add..1ee49e1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java @@ -220,6 +220,10 @@ public class GlobalStreamThread extends Thread { this.flushInterval = flushInterval; } + /** + * @throws IllegalStateException If store gets registered after initialized is already finished + * @throws StreamsException if the store's change log does not contain the partition + */ void initialize() { final Map<TopicPartition, Long> partitionOffsets = stateMaintainer.initialize(); consumer.assign(partitionOffsets.keySet()); @@ -312,7 +316,8 @@ public class GlobalStreamThread extends Thread { streamsMetrics, cache), stateMgr, - config.defaultDeserializationExceptionHandler()), + config.defaultDeserializationExceptionHandler(), + logContext), time, config.getLong(StreamsConfig.POLL_MS_CONFIG), config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG)); http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index 81d2f6c..06405ef 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -178,8 +178,10 @@ public class InternalTopologyBuilder { } private static class StateStoreSupplierFactory extends AbstractStateStoreFactory { + @SuppressWarnings("deprecation") private final StateStoreSupplier supplier; + @SuppressWarnings("deprecation") StateStoreSupplierFactory(final StateStoreSupplier<?> supplier) { super(supplier.name(), supplier.loggingEnabled(), @@ -495,6 +497,7 @@ public class InternalTopologyBuilder { nodeGrouper.unite(name, predecessorNames); } + @SuppressWarnings("deprecation") public final void addStateStore(final StateStoreSupplier supplier, final String... processorNames) { Objects.requireNonNull(supplier, "supplier can't be null"); @@ -527,6 +530,7 @@ public class InternalTopologyBuilder { } } + @SuppressWarnings("deprecation") public final void addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier, final String sourceName, final TimestampExtractor timestampExtractor, @@ -1612,8 +1616,8 @@ public class InternalTopologyBuilder { return Collections.unmodifiableSet(nodes); } - // only for testing - public Iterator<TopologyDescription.Node> nodesInOrder() { + // visible for testing + Iterator<TopologyDescription.Node> nodesInOrder() { return nodes.iterator(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index 2f16547..cc14c67 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -21,7 +21,6 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.ProcessorStateException; -import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.BatchingStateRestoreCallback; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; @@ -41,8 +40,7 @@ import java.util.Map; public class ProcessorStateManager implements StateManager { - - public static final String STATE_CHANGELOG_TOPIC_SUFFIX = "-changelog"; + private static final String STATE_CHANGELOG_TOPIC_SUFFIX = "-changelog"; static final String CHECKPOINT_FILE_NAME = ".checkpoint"; private final Logger log; @@ -119,17 +117,8 @@ public class ProcessorStateManager implements StateManager { return baseDir; } - /** - * @throws IllegalArgumentException if the store name has already been registered or if it is not a valid name - * (e.g., when it conflicts with the names of internal topics, like the checkpoint file name) - * - * // TODO: parameter loggingEnabled can be removed now - * - * @throws StreamsException if the store's change log does not contain the partition - */ @Override public void register(final StateStore store, - final boolean loggingEnabled, final StateRestoreCallback stateRestoreCallback) { log.debug("Registering state store {} to its state manager", store.name()); http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java index 4e04108..1b5f764 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java @@ -17,7 +17,73 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.streams.errors.DeserializationExceptionHandler; +import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.slf4j.Logger; -interface RecordDeserializer { - ConsumerRecord<Object, Object> deserialize(final ConsumerRecord<byte[], byte[]> rawRecord); +import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG; + +class RecordDeserializer { + private final SourceNode sourceNode; + private final DeserializationExceptionHandler deserializationExceptionHandler; + private final Logger log; + + RecordDeserializer(final SourceNode sourceNode, + final DeserializationExceptionHandler deserializationExceptionHandler, + final LogContext logContext) { + this.sourceNode = sourceNode; + this.deserializationExceptionHandler = deserializationExceptionHandler; + this.log = logContext.logger(RecordDeserializer.class); + } + + /** + * @throws StreamsException if a deserialization error occurs and the deserialization callback returns + * {@link DeserializationExceptionHandler.DeserializationHandlerResponse#FAIL FAIL} or throws an exception itself + */ + @SuppressWarnings("deprecation") + ConsumerRecord<Object, Object> deserialize(final ProcessorContext processorContext, + final ConsumerRecord<byte[], byte[]> rawRecord) { + + try { + return new ConsumerRecord<>( + rawRecord.topic(), + rawRecord.partition(), + rawRecord.offset(), + rawRecord.timestamp(), + TimestampType.CREATE_TIME, + rawRecord.checksum(), + rawRecord.serializedKeySize(), + rawRecord.serializedValueSize(), + sourceNode.deserializeKey(rawRecord.topic(), rawRecord.headers(), rawRecord.key()), + sourceNode.deserializeValue(rawRecord.topic(), rawRecord.headers(), rawRecord.value())); + } catch (final Exception deserializationException) { + final DeserializationExceptionHandler.DeserializationHandlerResponse response; + try { + response = deserializationExceptionHandler.handle(processorContext, rawRecord, deserializationException); + } catch (final Exception fatalUserException) { + log.error("Deserialization error callback failed after deserialization error for record {}", + rawRecord, + deserializationException); + throw new StreamsException("Fatal user code error in deserialization error callback", fatalUserException); + } + + if (response == DeserializationExceptionHandler.DeserializationHandlerResponse.FAIL) { + throw new StreamsException("Deserialization exception handler is set to fail upon" + + " a deserialization error. If you would rather have the streaming pipeline" + + " continue after a deserialization error, please set the " + + DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.", + deserializationException); + } else { + sourceNode.nodeMetrics.sourceNodeSkippedDueToDeserializationError.record(); + } + } + return null; + } + + SourceNode sourceNode() { + return sourceNode; + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java index 889b6d8..e6facaf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java @@ -18,11 +18,12 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.errors.DeserializationExceptionHandler; +import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.TimestampExtractor; import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.ArrayDeque; @@ -33,16 +34,14 @@ import java.util.ArrayDeque; * timestamp is monotonically increasing such that once it is advanced, it will not be decremented. */ public class RecordQueue { - - private static final Logger log = LoggerFactory.getLogger(RecordQueue.class); - private final SourceNode source; private final TimestampExtractor timestampExtractor; private final TopicPartition partition; private final ArrayDeque<StampedRecord> fifoQueue; private final TimestampTracker<ConsumerRecord<Object, Object>> timeTracker; - private final SourceNodeRecordDeserializer recordDeserializer; + private final RecordDeserializer recordDeserializer; private final ProcessorContext processorContext; + private final Logger log; private long partitionTime = TimestampTracker.NOT_KNOWN; @@ -50,14 +49,16 @@ public class RecordQueue { final SourceNode source, final TimestampExtractor timestampExtractor, final DeserializationExceptionHandler deserializationExceptionHandler, - final ProcessorContext processorContext) { + final ProcessorContext processorContext, + final LogContext logContext) { this.partition = partition; this.source = source; this.timestampExtractor = timestampExtractor; this.fifoQueue = new ArrayDeque<>(); this.timeTracker = new MinTimestampTracker<>(); - this.recordDeserializer = new SourceNodeRecordDeserializer(source, deserializationExceptionHandler); + this.recordDeserializer = new RecordDeserializer(source, deserializationExceptionHandler, logContext); this.processorContext = processorContext; + this.log = logContext.logger(RecordQueue.class); } /** @@ -87,12 +88,21 @@ public class RecordQueue { int addRawRecords(final Iterable<ConsumerRecord<byte[], byte[]>> rawRecords) { for (final ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) { - final ConsumerRecord<Object, Object> record = recordDeserializer.tryDeserialize(processorContext, rawRecord); + final ConsumerRecord<Object, Object> record = recordDeserializer.deserialize(processorContext, rawRecord); if (record == null) { continue; } - final long timestamp = timestampExtractor.extract(record, timeTracker.get()); + final long timestamp; + try { + timestamp = timestampExtractor.extract(record, timeTracker.get()); + } catch (final StreamsException internalFatalExtractorException) { + throw internalFatalExtractorException; + } catch (final Exception fatalUserException) { + throw new StreamsException( + String.format("Fatal user code error in TimestampExtractor callback for record %s.", record), + fatalUserException); + } log.trace("Source node {} extracted timestamp {} for record {}", source.name(), timestamp, record); // drop message if TS is invalid, i.e., negative http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java deleted file mode 100644 index 7fde881..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.processor.internals; - -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.record.TimestampType; -import org.apache.kafka.streams.errors.DeserializationExceptionHandler; -import org.apache.kafka.streams.errors.StreamsException; -import org.apache.kafka.streams.processor.ProcessorContext; - -import static java.lang.String.format; -import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG; - -class SourceNodeRecordDeserializer implements RecordDeserializer { - private final SourceNode sourceNode; - private final DeserializationExceptionHandler deserializationExceptionHandler; - - SourceNodeRecordDeserializer(final SourceNode sourceNode, - final DeserializationExceptionHandler deserializationExceptionHandler) { - this.sourceNode = sourceNode; - this.deserializationExceptionHandler = deserializationExceptionHandler; - } - - @SuppressWarnings("deprecation") - @Override - public ConsumerRecord<Object, Object> deserialize(final ConsumerRecord<byte[], byte[]> rawRecord) { - final Object key; - try { - key = sourceNode.deserializeKey(rawRecord.topic(), rawRecord.headers(), rawRecord.key()); - } catch (Exception e) { - throw new StreamsException(format("Failed to deserialize key for record. topic=%s, partition=%d, offset=%d", - rawRecord.topic(), rawRecord.partition(), rawRecord.offset()), e); - } - - final Object value; - try { - value = sourceNode.deserializeValue(rawRecord.topic(), rawRecord.headers(), rawRecord.value()); - } catch (Exception e) { - throw new StreamsException(format("Failed to deserialize value for record. topic=%s, partition=%d, offset=%d", - rawRecord.topic(), rawRecord.partition(), rawRecord.offset()), e); - } - - return new ConsumerRecord<>(rawRecord.topic(), rawRecord.partition(), rawRecord.offset(), - rawRecord.timestamp(), TimestampType.CREATE_TIME, - rawRecord.checksum(), - rawRecord.serializedKeySize(), - rawRecord.serializedValueSize(), key, value); - - } - - public ConsumerRecord<Object, Object> tryDeserialize(final ProcessorContext processorContext, - final ConsumerRecord<byte[], byte[]> rawRecord) { - - // catch and process if we have a deserialization handler - try { - return deserialize(rawRecord); - } catch (final Exception e) { - final DeserializationExceptionHandler.DeserializationHandlerResponse response = - deserializationExceptionHandler.handle(processorContext, rawRecord, e); - if (response == DeserializationExceptionHandler.DeserializationHandlerResponse.FAIL) { - throw new StreamsException("Deserialization exception handler is set to fail upon" + - " a deserialization error. If you would rather have the streaming pipeline" + - " continue after a deserialization error, please set the " + - DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.", - e); - } else { - sourceNode.nodeMetrics.sourceNodeSkippedDueToDeserializationError.record(); - } - } - return null; - } - - public SourceNode sourceNode() { - return sourceNode; - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java index 511280d..2a8d9a3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; @@ -27,7 +28,12 @@ import java.util.Map; interface StateManager extends Checkpointable { File baseDir(); - void register(final StateStore store, final boolean loggingEnabled, final StateRestoreCallback stateRestoreCallback); + /** + * @throws IllegalArgumentException if the store name has already been registered or if it is not a valid name + * (e.g., when it conflicts with the names of internal topics, like the checkpoint file name) + * @throws StreamsException if the store's change log does not contain the partition + */ + void register(final StateStore store, final StateRestoreCallback stateRestoreCallback); void flush(); http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java index 579561f..33dce9e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java @@ -79,8 +79,8 @@ public class StateRestorer { return persistent; } - void setGlobalRestoreListener(StateRestoreListener globalStateRestoreListener) { - this.compositeRestoreListener.setGlobalRestoreListener(globalStateRestoreListener); + void setUserRestoreListener(StateRestoreListener userRestoreListener) { + this.compositeRestoreListener.setUserRestoreListener(userRestoreListener); } void setRestoredOffset(final long restoredOffset) { http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java index caa0100..4ba860d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java @@ -41,7 +41,7 @@ public class StoreChangelogReader implements ChangelogReader { private final Logger log; private final Consumer<byte[], byte[]> consumer; - private final StateRestoreListener stateRestoreListener; + private final StateRestoreListener userStateRestoreListener; private final Map<TopicPartition, Long> endOffsets = new HashMap<>(); private final Map<String, List<PartitionInfo>> partitionInfo = new HashMap<>(); private final Map<TopicPartition, StateRestorer> stateRestorers = new HashMap<>(); @@ -50,11 +50,11 @@ public class StoreChangelogReader implements ChangelogReader { public StoreChangelogReader(final String threadId, final Consumer<byte[], byte[]> consumer, - final StateRestoreListener stateRestoreListener, + final StateRestoreListener userStateRestoreListener, final LogContext logContext) { this.consumer = consumer; this.log = logContext.logger(getClass()); - this.stateRestoreListener = stateRestoreListener; + this.userStateRestoreListener = userStateRestoreListener; } public StoreChangelogReader(final Consumer<byte[], byte[]> consumer, @@ -65,7 +65,7 @@ public class StoreChangelogReader implements ChangelogReader { @Override public void register(final StateRestorer restorer) { - restorer.setGlobalRestoreListener(stateRestoreListener); + restorer.setUserRestoreListener(userStateRestoreListener); stateRestorers.put(restorer.partition(), restorer); needsInitializing.put(restorer.partition(), restorer); } http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 6775edb..8c26fa9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -136,7 +136,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator for (final TopicPartition partition : partitions) { final SourceNode source = topology.source(partition.topic()); final TimestampExtractor sourceTimestampExtractor = source.getTimestampExtractor() != null ? source.getTimestampExtractor() : defaultTimestampExtractor; - final RecordQueue queue = new RecordQueue(partition, source, sourceTimestampExtractor, defaultDeserializationExceptionHandler, processorContext); + final RecordQueue queue = new RecordQueue(partition, source, sourceTimestampExtractor, defaultDeserializationExceptionHandler, processorContext, logContext); partitionQueues.put(partition, queue); } @@ -151,6 +151,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator } } + @Override public boolean initialize() { log.trace("Initializing"); initializeStateStores(); http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index e141c46..8d13558 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -641,7 +641,7 @@ public class StreamThread extends Thread implements ThreadDataProvider { final StreamsMetadataState streamsMetadataState, final long cacheSizeBytes, final StateDirectory stateDirectory, - final StateRestoreListener stateRestoreListener) { + final StateRestoreListener userStateRestoreListener) { final String threadClientId = clientId + "-StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement(); final StreamsMetricsThreadImpl streamsMetrics = new StreamsMetricsThreadImpl(metrics, @@ -666,8 +666,8 @@ public class StreamThread extends Thread implements ThreadDataProvider { final Consumer<byte[], byte[]> restoreConsumer = clientSupplier.getRestoreConsumer(consumerConfigs); final StoreChangelogReader changelogReader = new StoreChangelogReader(threadClientId, restoreConsumer, - stateRestoreListener, - logContext); + userStateRestoreListener, + logContext); Producer<byte[], byte[]> threadProducer = null; if (!eosEnabled) { @@ -757,6 +757,8 @@ public class StreamThread extends Thread implements ThreadDataProvider { /** * Main event loop for polling, and processing records through topologies. + * @throws IllegalStateException If store gets registered after initialized is already finished + * @throws StreamsException if the store's change log does not contain the partition */ private void runLoop() { long recordsProcessedBeforeCommit = UNLIMITED_RECORDS; @@ -767,6 +769,10 @@ public class StreamThread extends Thread implements ThreadDataProvider { } } + /** + * @throws IllegalStateException If store gets registered after initialized is already finished + * @throws StreamsException if the store's change log does not contain the partition + */ // Visible for testing long runOnce(final long recordsProcessedBeforeCommit) { long processedBeforeCommit = recordsProcessedBeforeCommit; http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java index a481c31..80e5423 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; @@ -71,6 +72,8 @@ public interface Task { /** * initialize the task and return true if the task is ready to run, i.e, it has not state stores * @return true if this task has no state stores that may need restoring. + * @throws IllegalStateException If store gets registered after initialized is already finished + * @throws StreamsException if the store's change log does not contain the partition */ boolean initialize(); http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index f12ed91..278957e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -239,7 +239,10 @@ class TaskManager { this.consumer = consumer; } - + /** + * @throws IllegalStateException If store gets registered after initialized is already finished + * @throws StreamsException if the store's change log does not contain the partition + */ boolean updateNewAndRestoringTasks() { active.initializeNewTasks(); standby.initializeNewTasks(); http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java index 7e24969..929d584 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java @@ -72,7 +72,7 @@ public class InMemoryKeyValueStore<K, V> implements KeyValueStore<K, V> { if (root != null) { // register the store - context.register(root, true, new StateRestoreCallback() { + context.register(root, false, new StateRestoreCallback() { @Override public void restore(byte[] key, byte[] value) { // this is a delete http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java index beb9ce1..f1aa96f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java @@ -108,7 +108,7 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> { valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde); // register the store - context.register(root, true, new StateRestoreCallback() { + context.register(root, false, new StateRestoreCallback() { @Override public void restore(byte[] key, byte[] value) { restoring = true; http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java index ded2732..7786348 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java @@ -121,7 +121,7 @@ public class TopologyBuilderTest { final Serde<String> stringSerde = Serdes.String(); try { - builder.addSource(TopologyBuilder.AutoOffsetReset.EARLIEST, "source", null, stringSerde.deserializer(), stringSerde.deserializer(), new String[]{}); + builder.addSource(TopologyBuilder.AutoOffsetReset.EARLIEST, "source", null, stringSerde.deserializer(), stringSerde.deserializer()); fail("Should throw TopologyBuilderException with no topics"); } catch (TopologyBuilderException tpe) { //no-op http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/test/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListenerTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListenerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListenerTest.java index 88aba94..e2ea668 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListenerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListenerTest.java @@ -124,7 +124,7 @@ public class CompositeRestoreListenerTest { @Test public void shouldHandleNullReportStoreListener() { compositeRestoreListener = new CompositeRestoreListener(batchingStateRestoreCallback); - compositeRestoreListener.setGlobalRestoreListener(null); + compositeRestoreListener.setUserRestoreListener(null); compositeRestoreListener.restoreAll(records); compositeRestoreListener.onRestoreStart(topicPartition, storeName, startOffset, endOffset); @@ -138,7 +138,7 @@ public class CompositeRestoreListenerTest { @Test public void shouldHandleNoRestoreListener() { compositeRestoreListener = new CompositeRestoreListener(noListenBatchingStateRestoreCallback); - compositeRestoreListener.setGlobalRestoreListener(null); + compositeRestoreListener.setUserRestoreListener(null); compositeRestoreListener.restoreAll(records); compositeRestoreListener.onRestoreStart(topicPartition, storeName, startOffset, endOffset); @@ -151,7 +151,7 @@ public class CompositeRestoreListenerTest { @Test(expected = UnsupportedOperationException.class) public void shouldThrowExceptionWhenSinglePutDirectlyCalled() { compositeRestoreListener = new CompositeRestoreListener(noListenBatchingStateRestoreCallback); - compositeRestoreListener.setGlobalRestoreListener(null); + compositeRestoreListener.setUserRestoreListener(null); compositeRestoreListener.restore(key, value); } @@ -179,7 +179,7 @@ public class CompositeRestoreListenerTest { private void setUpCompositeRestoreListener(StateRestoreCallback stateRestoreCallback) { compositeRestoreListener = new CompositeRestoreListener(stateRestoreCallback); - compositeRestoreListener.setGlobalRestoreListener(reportingStoreListener); + compositeRestoreListener.setUserRestoreListener(reportingStoreListener); } http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java index b438347..0519fb0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java @@ -166,7 +166,7 @@ public class GlobalStateManagerImplTest { stateManager.initialize(context); try { - stateManager.register(new NoOpReadOnlyStore<>("not-in-topology"), false, new TheStateRestoreCallback()); + stateManager.register(new NoOpReadOnlyStore<>("not-in-topology"), new TheStateRestoreCallback()); fail("should have raised an illegal argument exception as store is not in the topology"); } catch (final IllegalArgumentException e) { // pass @@ -177,9 +177,9 @@ public class GlobalStateManagerImplTest { public void shouldThrowIllegalArgumentExceptionIfAttemptingToRegisterStoreTwice() { stateManager.initialize(context); initializeConsumer(2, 1, t1); - stateManager.register(store1, false, new TheStateRestoreCallback()); + stateManager.register(store1, new TheStateRestoreCallback()); try { - stateManager.register(store1, false, new TheStateRestoreCallback()); + stateManager.register(store1, new TheStateRestoreCallback()); fail("should have raised an illegal argument exception as store has already been registered"); } catch (final IllegalArgumentException e) { // pass @@ -190,7 +190,7 @@ public class GlobalStateManagerImplTest { public void shouldThrowStreamsExceptionIfNoPartitionsFoundForStore() { stateManager.initialize(context); try { - stateManager.register(store1, false, new TheStateRestoreCallback()); + stateManager.register(store1, new TheStateRestoreCallback()); fail("Should have raised a StreamsException as there are no partition for the store"); } catch (final StreamsException e) { // pass @@ -204,7 +204,7 @@ public class GlobalStateManagerImplTest { stateManager.initialize(context); final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback(); - stateManager.register(store1, false, stateRestoreCallback); + stateManager.register(store1, stateRestoreCallback); assertEquals(2, stateRestoreCallback.restored.size()); } @@ -236,7 +236,7 @@ public class GlobalStateManagerImplTest { stateManager.initialize(context); final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback(); - stateManager.register(store1, false, stateRestoreCallback); + stateManager.register(store1, stateRestoreCallback); assertEquals(5, stateRestoreCallback.restored.size()); } @@ -247,9 +247,9 @@ public class GlobalStateManagerImplTest { final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback(); // register the stores initializeConsumer(1, 1, t1); - stateManager.register(store1, false, stateRestoreCallback); + stateManager.register(store1, stateRestoreCallback); initializeConsumer(1, 1, t2); - stateManager.register(store2, false, stateRestoreCallback); + stateManager.register(store2, stateRestoreCallback); stateManager.flush(); assertTrue(store1.flushed); @@ -267,7 +267,7 @@ public class GlobalStateManagerImplTest { public void flush() { throw new RuntimeException("KABOOM!"); } - }, false, stateRestoreCallback); + }, stateRestoreCallback); stateManager.flush(); } @@ -278,9 +278,9 @@ public class GlobalStateManagerImplTest { final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback(); // register the stores initializeConsumer(1, 1, t1); - stateManager.register(store1, false, stateRestoreCallback); + stateManager.register(store1, stateRestoreCallback); initializeConsumer(1, 1, t2); - stateManager.register(store2, false, stateRestoreCallback); + stateManager.register(store2, stateRestoreCallback); stateManager.close(Collections.<TopicPartition, Long>emptyMap()); assertFalse(store1.isOpen()); @@ -292,7 +292,7 @@ public class GlobalStateManagerImplTest { stateManager.initialize(context); final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback(); initializeConsumer(1, 1, t1); - stateManager.register(store1, false, stateRestoreCallback); + stateManager.register(store1, stateRestoreCallback); final Map<TopicPartition, Long> expected = Collections.singletonMap(t1, 25L); stateManager.close(expected); final Map<TopicPartition, Long> result = readOffsetsCheckpoint(); @@ -308,7 +308,7 @@ public class GlobalStateManagerImplTest { public void close() { throw new RuntimeException("KABOOM!"); } - }, false, stateRestoreCallback); + }, stateRestoreCallback); stateManager.close(Collections.<TopicPartition, Long>emptyMap()); } @@ -317,7 +317,7 @@ public class GlobalStateManagerImplTest { public void shouldThrowIllegalArgumentExceptionIfCallbackIsNull() { stateManager.initialize(context); try { - stateManager.register(store1, false, null); + stateManager.register(store1, null); fail("should have thrown due to null callback"); } catch (IllegalArgumentException e) { //pass @@ -349,7 +349,7 @@ public class GlobalStateManagerImplTest { } super.close(); } - }, false, stateRestoreCallback); + }, stateRestoreCallback); stateManager.close(Collections.<TopicPartition, Long>emptyMap()); @@ -368,9 +368,9 @@ public class GlobalStateManagerImplTest { throw new RuntimeException("KABOOM!"); } }; - stateManager.register(store, false, stateRestoreCallback); + stateManager.register(store, stateRestoreCallback); - stateManager.register(store2, false, stateRestoreCallback); + stateManager.register(store2, stateRestoreCallback); try { stateManager.close(Collections.<TopicPartition, Long>emptyMap()); @@ -415,9 +415,9 @@ public class GlobalStateManagerImplTest { stateManager.initialize(context); final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback(); initializeConsumer(10, 1, t1); - stateManager.register(store1, false, stateRestoreCallback); + stateManager.register(store1, stateRestoreCallback); initializeConsumer(20, 1, t2); - stateManager.register(store2, false, stateRestoreCallback); + stateManager.register(store2, stateRestoreCallback); final Map<TopicPartition, Long> initialCheckpoint = stateManager.checkpointed(); stateManager.checkpoint(Collections.singletonMap(t1, 101L)); @@ -444,7 +444,7 @@ public class GlobalStateManagerImplTest { stateManager.initialize(context); final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback(); - stateManager.register(store1, false, stateRestoreCallback); + stateManager.register(store1, stateRestoreCallback); final KeyValue<byte[], byte[]> restoredKv = stateRestoreCallback.restored.get(0); assertThat(stateRestoreCallback.restored, equalTo(Collections.singletonList(KeyValue.pair(restoredKv.key, restoredKv.value)))); } @@ -454,7 +454,7 @@ public class GlobalStateManagerImplTest { stateManager.initialize(context); final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback(); initializeConsumer(10, 1, t1); - stateManager.register(store1, false, stateRestoreCallback); + stateManager.register(store1, stateRestoreCallback); stateManager.close(Collections.<TopicPartition, Long>emptyMap()); final Map<TopicPartition, Long> checkpointMap = stateManager.checkpointed(); http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java index 4ece443..63783a2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler; import org.apache.kafka.streams.errors.LogAndFailExceptionHandler; @@ -51,6 +52,7 @@ import static org.junit.Assert.fail; public class GlobalStateTaskTest { + private final LogContext logContext = new LogContext(); private Map<TopicPartition, Long> offsets; private GlobalStateUpdateTask globalStateTask; private GlobalStateManagerStub stateMgr; @@ -92,7 +94,7 @@ public class GlobalStateTaskTest { offsets.put(t1, 50L); offsets.put(t2, 100L); stateMgr = new GlobalStateManagerStub(storeNames, offsets); - globalStateTask = new GlobalStateUpdateTask(topology, context, stateMgr, new LogAndFailExceptionHandler()); + globalStateTask = new GlobalStateUpdateTask(topology, context, stateMgr, new LogAndFailExceptionHandler(), logContext); } @Test @@ -175,8 +177,12 @@ public class GlobalStateTaskTest { @Test public void shouldNotThrowStreamsExceptionWhenKeyDeserializationFailsWithSkipHandler() throws Exception { - final GlobalStateUpdateTask globalStateTask2 = new GlobalStateUpdateTask(topology, context, stateMgr, - new LogAndContinueExceptionHandler()); + final GlobalStateUpdateTask globalStateTask2 = new GlobalStateUpdateTask( + topology, + context, + stateMgr, + new LogAndContinueExceptionHandler(), + logContext); final byte[] key = new LongSerializer().serialize("t2", 1L); final byte[] recordValue = new IntegerSerializer().serialize("t2", 10); @@ -185,8 +191,12 @@ public class GlobalStateTaskTest { @Test public void shouldNotThrowStreamsExceptionWhenValueDeserializationFails() throws Exception { - final GlobalStateUpdateTask globalStateTask2 = new GlobalStateUpdateTask(topology, context, stateMgr, - new LogAndContinueExceptionHandler()); + final GlobalStateUpdateTask globalStateTask2 = new GlobalStateUpdateTask( + topology, + context, + stateMgr, + new LogAndContinueExceptionHandler(), + logContext); final byte[] key = new IntegerSerializer().serialize("t2", 1); final byte[] recordValue = new LongSerializer().serialize("t2", 10L); http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java index 95636ec..e223699 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java @@ -295,6 +295,7 @@ public class InternalTopologyBuilderTest { } catch (final TopologyException expected) { /* ok */ } } + @SuppressWarnings("deprecation") @Test public void testAddStateStore() { final StateStoreSupplier supplier = new MockStateStoreSupplier("store-1", false); @@ -344,6 +345,7 @@ public class InternalTopologyBuilderTest { assertEquals(mkSet(mkSet("topic-1", "X-topic-1x", "topic-2")), new HashSet<>(copartitionGroups)); } + @SuppressWarnings("deprecation") @Test public void testTopicGroupsByStateStore() { builder.setApplicationId("X"); @@ -470,6 +472,7 @@ public class InternalTopologyBuilderTest { builder.setApplicationId(null); } + @SuppressWarnings("deprecation") @Test(expected = NullPointerException.class) public void shouldNotAddNullStateStoreSupplier() throws Exception { builder.addStateStore((StateStoreSupplier) null); http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java index d9f38eb..00a2c35 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler; import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.test.MockSourceNode; @@ -35,14 +36,27 @@ import java.util.List; import static org.junit.Assert.assertEquals; public class PartitionGroupTest { + private final LogContext logContext = new LogContext(); private final Serializer<Integer> intSerializer = new IntegerSerializer(); private final Deserializer<Integer> intDeserializer = new IntegerDeserializer(); private final TimestampExtractor timestampExtractor = new MockTimestampExtractor(); private final String[] topics = {"topic"}; private final TopicPartition partition1 = new TopicPartition(topics[0], 1); private final TopicPartition partition2 = new TopicPartition(topics[0], 2); - private final RecordQueue queue1 = new RecordQueue(partition1, new MockSourceNode<>(topics, intDeserializer, intDeserializer), timestampExtractor, new LogAndContinueExceptionHandler(), null); - private final RecordQueue queue2 = new RecordQueue(partition2, new MockSourceNode<>(topics, intDeserializer, intDeserializer), timestampExtractor, new LogAndContinueExceptionHandler(), null); + private final RecordQueue queue1 = new RecordQueue( + partition1, + new MockSourceNode<>(topics, intDeserializer, intDeserializer), + timestampExtractor, + new LogAndContinueExceptionHandler(), + null, + logContext); + private final RecordQueue queue2 = new RecordQueue( + partition2, + new MockSourceNode<>(topics, intDeserializer, intDeserializer), + timestampExtractor, + new LogAndContinueExceptionHandler(), + null, + logContext); private final byte[] recordValue = intSerializer.serialize(null, 10); private final byte[] recordKey = intSerializer.serialize(null, 1);
