http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 73f443e..b753cf9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -36,6 +36,7 @@ import org.apache.kafka.common.metrics.stats.Max; import org.apache.kafka.common.metrics.stats.Meter; import org.apache.kafka.common.metrics.stats.SampledStat; import org.apache.kafka.common.metrics.stats.Sum; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.KafkaClientSupplier; import org.apache.kafka.streams.StreamsConfig; @@ -49,7 +50,6 @@ import org.apache.kafka.streams.processor.TaskMetadata; import org.apache.kafka.streams.processor.ThreadMetadata; import org.apache.kafka.streams.state.internals.ThreadCache; import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.File; import java.util.ArrayList; @@ -68,7 +68,7 @@ import static java.util.Collections.singleton; public class StreamThread extends Thread implements ThreadDataProvider { - private static final Logger log = LoggerFactory.getLogger(StreamThread.class); + private final Logger log; private static final AtomicInteger STREAM_THREAD_ID_SEQUENCE = new AtomicInteger(1); /** @@ -193,10 +193,10 @@ public class StreamThread extends Thread implements ThreadDataProvider { // refused but we do not throw exception here return false; } else if (!state.isValidTransition(newState)) { - log.error("{} Unexpected state transition from {} to {}", logPrefix, oldState, newState); - throw new StreamsException(logPrefix + " Unexpected state transition from " + oldState + " to " + newState); + log.error("Unexpected state transition from {} to {}", oldState, newState); + throw new StreamsException(logPrefix + "Unexpected state transition from " + oldState + " to " + newState); } else { - log.info("{} State transition from {} to {}", logPrefix, oldState, newState); + log.info("State transition from {} to {}", oldState, newState); } state = newState; @@ -229,24 +229,23 @@ public class StreamThread extends Thread implements ThreadDataProvider { private final Time time; private final TaskManager taskManager; private final StreamThread streamThread; - private final String logPrefix; + private final Logger log; RebalanceListener(final Time time, final TaskManager taskManager, final StreamThread streamThread, - final String logPrefix) { + final Logger log) { this.time = time; this.taskManager = taskManager; this.streamThread = streamThread; - this.logPrefix = logPrefix; + this.log = log; } @Override public void onPartitionsAssigned(final Collection<TopicPartition> assignment) { - log.debug("{} at state {}: partitions {} assigned at the end of consumer rebalance.\n" + + log.debug("at state {}: partitions {} assigned at the end of consumer rebalance.\n" + "\tcurrent suspended active tasks: {}\n" + "\tcurrent suspended standby tasks: {}\n", - logPrefix, streamThread.state, assignment, taskManager.suspendedActiveTaskIds(), @@ -260,15 +259,14 @@ public class StreamThread extends Thread implements ThreadDataProvider { taskManager.createTasks(assignment); streamThread.refreshMetadataState(); } catch (final Throwable t) { - log.error("{} Error caught during partition assignment, " + - "will abort the current process and re-throw at the end of rebalance: {}", logPrefix, t.getMessage()); + log.error("Error caught during partition assignment, " + + "will abort the current process and re-throw at the end of rebalance: {}", t.getMessage()); streamThread.setRebalanceException(t); } finally { - log.info("{} partition assignment took {} ms.\n" + + log.info("partition assignment took {} ms.\n" + "\tcurrent active tasks: {}\n" + "\tcurrent standby tasks: {}\n" + "\tprevious active tasks: {}\n", - logPrefix, time.milliseconds() - start, taskManager.activeTaskIds(), taskManager.standbyTaskIds(), @@ -278,10 +276,9 @@ public class StreamThread extends Thread implements ThreadDataProvider { @Override public void onPartitionsRevoked(final Collection<TopicPartition> assignment) { - log.debug("{} at state {}: partitions {} revoked at the beginning of consumer rebalance.\n" + + log.debug("at state {}: partitions {} revoked at the beginning of consumer rebalance.\n" + "\tcurrent assigned active tasks: {}\n" + "\tcurrent assigned standby tasks: {}\n", - logPrefix, streamThread.state, assignment, taskManager.activeTaskIds(), @@ -293,17 +290,16 @@ public class StreamThread extends Thread implements ThreadDataProvider { // suspend active tasks taskManager.suspendTasksAndState(); } catch (final Throwable t) { - log.error("{} Error caught during partition revocation, " + - "will abort the current process and re-throw at the end of rebalance: {}", logPrefix, t.getMessage()); + log.error("Error caught during partition revocation, " + + "will abort the current process and re-throw at the end of rebalance: {}", t.getMessage()); streamThread.setRebalanceException(t); } finally { streamThread.refreshMetadataState(); streamThread.clearStandbyRecords(); - log.info("{} partition revocation took {} ms.\n" + + log.info("partition revocation took {} ms.\n" + "\tsuspended active tasks: {}\n" + "\tsuspended standby tasks: {}", - logPrefix, time.milliseconds() - start, taskManager.suspendedActiveTaskIds(), taskManager.suspendedStandbyTaskIds()); @@ -321,7 +317,8 @@ public class StreamThread extends Thread implements ThreadDataProvider { final Sensor taskCreatedSensor; final ChangelogReader storeChangelogReader; final Time time; - final String logPrefix; + final Logger log; + AbstractTaskCreator(final InternalTopologyBuilder builder, final StreamsConfig config, @@ -330,7 +327,7 @@ public class StreamThread extends Thread implements ThreadDataProvider { final Sensor taskCreatedSensor, final ChangelogReader storeChangelogReader, final Time time, - final String logPrefix) { + final Logger log) { this.applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG); this.builder = builder; this.config = config; @@ -339,7 +336,7 @@ public class StreamThread extends Thread implements ThreadDataProvider { this.taskCreatedSensor = taskCreatedSensor; this.storeChangelogReader = storeChangelogReader; this.time = time; - this.logPrefix = logPrefix; + this.log = log; } Collection<Task> createTasks(final Consumer<byte[], byte[]> consumer, final Map<TaskId, Set<TopicPartition>> tasksToBeCreated) { @@ -349,7 +346,7 @@ public class StreamThread extends Thread implements ThreadDataProvider { final Set<TopicPartition> partitions = newTaskAndPartitions.getValue(); Task task = createTask(consumer, taskId, partitions); if (task != null) { - log.trace("{} Created task {} with assigned partitions {}", logPrefix, taskId, partitions); + log.trace("Created task {} with assigned partitions {}", taskId, partitions); createdTasks.add(task); } @@ -368,7 +365,6 @@ public class StreamThread extends Thread implements ThreadDataProvider { private final String threadClientId; private final Producer<byte[], byte[]> threadProducer; - TaskCreator(final InternalTopologyBuilder builder, final StreamsConfig config, final StreamsMetrics streamsMetrics, @@ -380,7 +376,7 @@ public class StreamThread extends Thread implements ThreadDataProvider { final KafkaClientSupplier clientSupplier, final Producer<byte[], byte[]> threadProducer, final String threadClientId, - final String logPrefix) { + final Logger log) { super(builder, config, streamsMetrics, @@ -388,7 +384,7 @@ public class StreamThread extends Thread implements ThreadDataProvider { taskCreatedSensor, storeChangelogReader, time, - logPrefix); + log); this.cache = cache; this.clientSupplier = clientSupplier; this.threadProducer = threadProducer; @@ -419,7 +415,7 @@ public class StreamThread extends Thread implements ThreadDataProvider { // eos if (threadProducer == null) { final Map<String, Object> producerConfigs = config.getProducerConfigs(threadClientId + "-" + id); - log.info("{} Creating producer client for task {}", logPrefix, id); + log.info("Creating producer client for task {}", id); producerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, applicationId + "-" + id); return clientSupplier.getProducer(producerConfigs); } @@ -434,7 +430,7 @@ public class StreamThread extends Thread implements ThreadDataProvider { try { threadProducer.close(); } catch (final Throwable e) { - log.error("{} Failed to close producer due to the following error:", logPrefix, e); + log.error("Failed to close producer due to the following error:", e); } } } @@ -448,7 +444,7 @@ public class StreamThread extends Thread implements ThreadDataProvider { final Sensor taskCreatedSensor, final ChangelogReader storeChangelogReader, final Time time, - final String logPrefix) { + final Logger log) { super(builder, config, streamsMetrics, @@ -456,7 +452,7 @@ public class StreamThread extends Thread implements ThreadDataProvider { taskCreatedSensor, storeChangelogReader, time, - logPrefix); + log); } @Override @@ -468,7 +464,7 @@ public class StreamThread extends Thread implements ThreadDataProvider { if (!topology.stateStores().isEmpty()) { return new StandbyTask(taskId, applicationId, partitions, topology, consumer, storeChangelogReader, config, streamsMetrics, stateDirectory); } else { - log.trace("{} Skipped standby task {} with assigned partitions {} since it does not have any state stores to materialize", logPrefix, taskId, partitions); + log.trace("Skipped standby task {} with assigned partitions {} since it does not have any state stores to materialize", taskId, partitions); return null; } @@ -601,21 +597,24 @@ public class StreamThread extends Thread implements ThreadDataProvider { this.time = time; this.streamsMetadataState = streamsMetadataState; this.taskManager = taskManager; - this.logPrefix = logPrefix(threadClientId); + this.logPrefix = String.format("stream-thread [%s] ", threadClientId); this.streamsMetrics = streamsMetrics; this.restoreConsumer = restoreConsumer; this.stateDirectory = stateDirectory; - this.rebalanceListener = new RebalanceListener(time, taskManager, this, logPrefix); this.config = config; this.stateLock = new Object(); this.standbyRecords = new HashMap<>(); this.partitionGrouper = config.getConfiguredInstance(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, PartitionGrouper.class); - log.info("{} Creating consumer client", logPrefix); + final LogContext logContext = new LogContext(this.logPrefix); + this.log = logContext.logger(StreamThread.class); + this.rebalanceListener = new RebalanceListener(time, taskManager, this, this.log); + + log.info("Creating consumer client"); final Map<String, Object> consumerConfigs = config.getConsumerConfigs(this, applicationId, threadClientId); if (!builder.latestResetTopicsPattern().pattern().equals("") || !builder.earliestResetTopicsPattern().pattern().equals("")) { originalReset = (String) consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG); - log.info("{} Custom offset resets specified updating configs original auto offset reset {}", logPrefix, originalReset); + log.info("Custom offset resets specified updating configs original auto offset reset {}", originalReset); consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none"); } this.consumer = clientSupplier.getConsumer(consumerConfigs); @@ -643,25 +642,29 @@ public class StreamThread extends Thread implements ThreadDataProvider { Collections.singletonMap("client-id", threadClientId)); - final String logPrefix = logPrefix(threadClientId); + final String logPrefix = String.format("stream-thread [%s] ", threadClientId); + final LogContext logContext = new LogContext(logPrefix); + final Logger log = logContext.logger(StreamThread.class); + if (config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) < 0) { - log.warn("{} Negative cache size passed in thread. Reverting to cache size of 0 bytes", logPrefix); + log.warn("Negative cache size passed in thread. Reverting to cache size of 0 bytes"); } - final ThreadCache cache = new ThreadCache(threadClientId, cacheSizeBytes, streamsMetrics); + final ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes, streamsMetrics); final boolean eosEnabled = StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)); - log.info("{} Creating restore consumer client", logPrefix); + log.info("Creating restore consumer client"); final Map<String, Object> consumerConfigs = config.getRestoreConsumerConfigs(threadClientId); final Consumer<byte[], byte[]> restoreConsumer = clientSupplier.getRestoreConsumer(consumerConfigs); final StoreChangelogReader changelogReader = new StoreChangelogReader(threadClientId, restoreConsumer, - stateRestoreListener); + stateRestoreListener, + logContext); Producer<byte[], byte[]> threadProducer = null; if (!eosEnabled) { final Map<String, Object> producerConfigs = config.getProducerConfigs(threadClientId); - log.info("{} Creating shared producer client", logPrefix); + log.info("Creating shared producer client"); threadProducer = clientSupplier.getProducer(producerConfigs); } @@ -676,7 +679,7 @@ public class StreamThread extends Thread implements ThreadDataProvider { clientSupplier, threadProducer, threadClientId, - logPrefix); + log); final AbstractTaskCreator standbyTaskCreator = new StandbyTaskCreator(builder, config, streamsMetrics, @@ -684,16 +687,16 @@ public class StreamThread extends Thread implements ThreadDataProvider { streamsMetrics.taskCreatedSensor, changelogReader, time, - logPrefix); + log); final TaskManager taskManager = new TaskManager(changelogReader, logPrefix, restoreConsumer, activeTaskCreator, standbyTaskCreator, - new AssignedTasks(logPrefix, + new AssignedTasks(logContext, "stream task" ), - new AssignedTasks(logPrefix, + new AssignedTasks(logContext, "standby task" )); @@ -713,10 +716,6 @@ public class StreamThread extends Thread implements ThreadDataProvider { } - private static String logPrefix(final String threadClientId) { - return String.format("stream-thread [%s]", threadClientId); - } - /** * Execute the stream processors * @@ -725,7 +724,7 @@ public class StreamThread extends Thread implements ThreadDataProvider { */ @Override public void run() { - log.info("{} Starting", logPrefix); + log.info("Starting"); setState(State.RUNNING); boolean cleanRun = false; try { @@ -737,7 +736,7 @@ public class StreamThread extends Thread implements ThreadDataProvider { } catch (final Exception e) { // we have caught all Kafka related exceptions, and other runtime exceptions // should be due to user application errors - log.error("{} Encountered the following error during processing:", logPrefix, e); + log.error("Encountered the following error during processing:", e); throw e; } finally { completeShutdown(cleanRun); @@ -808,7 +807,7 @@ public class StreamThread extends Thread implements ThreadDataProvider { if (rebalanceException != null) { if (!(rebalanceException instanceof ProducerFencedException)) { - throw new StreamsException(logPrefix + " Failed to rebalance.", rebalanceException); + throw new StreamsException(logPrefix + "Failed to rebalance.", rebalanceException); } } @@ -823,9 +822,9 @@ public class StreamThread extends Thread implements ThreadDataProvider { for (final TopicPartition partition : partitions) { if (builder.earliestResetTopicsPattern().matcher(partition.topic()).matches()) { - addToResetList(partition, seekToBeginning, "{} Setting topic '{}' to consume from {} offset", "earliest", loggedTopics); + addToResetList(partition, seekToBeginning, "Setting topic '{}' to consume from {} offset", "earliest", loggedTopics); } else if (builder.latestResetTopicsPattern().matcher(partition.topic()).matches()) { - addToResetList(partition, seekToEnd, "{} Setting topic '{}' to consume from {} offset", "latest", loggedTopics); + addToResetList(partition, seekToEnd, "Setting topic '{}' to consume from {} offset", "latest", loggedTopics); } else { if (originalReset == null || (!originalReset.equals("earliest") && !originalReset.equals("latest"))) { final String errorMessage = "No valid committed offset found for input topic %s (partition %s) and no valid reset policy configured." + @@ -835,9 +834,9 @@ public class StreamThread extends Thread implements ThreadDataProvider { } if (originalReset.equals("earliest")) { - addToResetList(partition, seekToBeginning, "{} No custom setting defined for topic '{}' using original config '{}' for offset reset", "earliest", loggedTopics); + addToResetList(partition, seekToBeginning, "No custom setting defined for topic '{}' using original config '{}' for offset reset", "earliest", loggedTopics); } else if (originalReset.equals("latest")) { - addToResetList(partition, seekToEnd, "{} No custom setting defined for topic '{}' using original config '{}' for offset reset", "latest", loggedTopics); + addToResetList(partition, seekToEnd, "No custom setting defined for topic '{}' using original config '{}' for offset reset", "latest", loggedTopics); } } } @@ -853,7 +852,7 @@ public class StreamThread extends Thread implements ThreadDataProvider { private void addToResetList(final TopicPartition partition, final Set<TopicPartition> partitions, final String logMessage, final String resetPolicy, final Set<String> loggedTopics) { final String topic = partition.topic(); if (loggedTopics.add(topic)) { - log.info(logMessage, logPrefix, topic, resetPolicy); + log.info(logMessage, topic, resetPolicy); } partitions.add(partition); } @@ -937,13 +936,13 @@ public class StreamThread extends Thread implements ThreadDataProvider { if (processLatency > 0 && processLatency > commitTime) { // push down recordsProcessedBeforeCommit = Math.max(1, (commitTime * totalProcessed) / processLatency); - log.debug("{} processing latency {} > commit time {} for {} records. Adjusting down recordsProcessedBeforeCommit={}", - logPrefix, processLatency, commitTime, totalProcessed, recordsProcessedBeforeCommit); + log.debug("processing latency {} > commit time {} for {} records. Adjusting down recordsProcessedBeforeCommit={}", + processLatency, commitTime, totalProcessed, recordsProcessedBeforeCommit); } else if (prevRecordsProcessedBeforeCommit != UNLIMITED_RECORDS && processLatency > 0) { // push up recordsProcessedBeforeCommit = Math.max(1, (commitTime * totalProcessed) / processLatency); - log.debug("{} processing latency {} < commit time {} for {} records. Adjusting up recordsProcessedBeforeCommit={}", - logPrefix, processLatency, commitTime, totalProcessed, recordsProcessedBeforeCommit); + log.debug("processing latency {} < commit time {} for {} records. Adjusting up recordsProcessedBeforeCommit={}", + processLatency, commitTime, totalProcessed, recordsProcessedBeforeCommit); } return recordsProcessedBeforeCommit; @@ -955,8 +954,8 @@ public class StreamThread extends Thread implements ThreadDataProvider { void maybeCommit(final long now) { if (commitTimeMs >= 0 && lastCommitMs + commitTimeMs < now) { if (log.isTraceEnabled()) { - log.trace("{} Committing all active tasks {} and standby tasks {} since {}ms has elapsed (commit interval is {}ms)", - logPrefix, taskManager.activeTaskIds(), taskManager.standbyTaskIds(), now - lastCommitMs, commitTimeMs); + log.trace("Committing all active tasks {} and standby tasks {} since {}ms has elapsed (commit interval is {}ms)", + taskManager.activeTaskIds(), taskManager.standbyTaskIds(), now - lastCommitMs, commitTimeMs); } int committed = taskManager.commitAll(); @@ -964,8 +963,8 @@ public class StreamThread extends Thread implements ThreadDataProvider { streamsMetrics.commitTimeSensor.record(computeLatency() / (double) committed, timerStartedMs); } if (log.isDebugEnabled()) { - log.info("{} Committed all active tasks {} and standby tasks {} in {}ms", - logPrefix, taskManager.activeTaskIds(), taskManager.standbyTaskIds(), timerStartedMs - now); + log.info("Committed all active tasks {} and standby tasks {} in {}ms", + taskManager.activeTaskIds(), taskManager.standbyTaskIds(), timerStartedMs - now); } lastCommitMs = now; @@ -996,7 +995,7 @@ public class StreamThread extends Thread implements ThreadDataProvider { standbyRecords = remainingStandbyRecords; - log.debug("{} Updated standby tasks {} in {}ms", logPrefix, taskManager.standbyTaskIds(), time.milliseconds() - now); + log.debug("Updated standby tasks {} in {}ms", taskManager.standbyTaskIds(), time.milliseconds() - now); } processStandbyRecords = false; } @@ -1008,7 +1007,7 @@ public class StreamThread extends Thread implements ThreadDataProvider { final Task task = taskManager.standbyTask(partition); if (task == null) { - throw new StreamsException(logPrefix + " Missing standby task for partition " + partition); + throw new StreamsException(logPrefix + "Missing standby task for partition " + partition); } final List<ConsumerRecord<byte[], byte[]>> remaining = task.update(partition, records.records(partition)); @@ -1041,7 +1040,7 @@ public class StreamThread extends Thread implements ThreadDataProvider { * (e.g., in testing), hence the state is set only the first time */ public void shutdown() { - log.info("{} Informed to shut down", logPrefix); + log.info("Informed to shut down"); setState(State.PENDING_SHUTDOWN); } @@ -1152,23 +1151,23 @@ public class StreamThread extends Thread implements ThreadDataProvider { // intentionally do not check the returned flag setState(State.PENDING_SHUTDOWN); - log.info("{} Shutting down", logPrefix); + log.info("Shutting down"); taskManager.shutdown(cleanRun); try { consumer.close(); } catch (final Throwable e) { - log.error("{} Failed to close consumer due to the following error:", logPrefix, e); + log.error("Failed to close consumer due to the following error:", e); } try { restoreConsumer.close(); } catch (final Throwable e) { - log.error("{} Failed to close restore consumer due to the following error:", logPrefix, e); + log.error("Failed to close restore consumer due to the following error:", e); } streamsMetrics.removeAllSensors(); setState(State.DEAD); - log.info("{} Shutdown complete", logPrefix); + log.info("Shutdown complete"); } private void clearStandbyRecords() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 2d896fa..7afbecf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -18,10 +18,10 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.TaskId; import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.Collection; import java.util.Collections; @@ -36,7 +36,7 @@ class TaskManager { // initialize the task list // activeTasks needs to be concurrent as it can be accessed // by QueryableState - private static final Logger log = LoggerFactory.getLogger(TaskManager.class); + private final Logger log; private final AssignedTasks active; private final AssignedTasks standby; private final ChangelogReader changelogReader; @@ -61,14 +61,18 @@ class TaskManager { this.standbyTaskCreator = standbyTaskCreator; this.active = active; this.standby = standby; + + final LogContext logContext = new LogContext(logPrefix); + + this.log = logContext.logger(getClass()); } void createTasks(final Collection<TopicPartition> assignment) { if (threadMetadataProvider == null) { - throw new IllegalStateException(logPrefix + " taskIdProvider has not been initialized while adding stream tasks. This should not happen."); + throw new IllegalStateException(logPrefix + "taskIdProvider has not been initialized while adding stream tasks. This should not happen."); } if (consumer == null) { - throw new IllegalStateException(logPrefix + " consumer has not been initialized while adding stream tasks. This should not happen."); + throw new IllegalStateException(logPrefix + "consumer has not been initialized while adding stream tasks. This should not happen."); } changelogReader.reset(); @@ -80,7 +84,7 @@ class TaskManager { addStreamTasks(assignment); addStandbyTasks(); final Set<TopicPartition> partitions = active.uninitializedPartitions(); - log.trace("{} pausing partitions: {}", logPrefix, partitions); + log.trace("pausing partitions: {}", partitions); consumer.pause(partitions); } @@ -95,7 +99,7 @@ class TaskManager { } final Map<TaskId, Set<TopicPartition>> newTasks = new HashMap<>(); // collect newly assigned tasks and reopen re-assigned tasks - log.debug("{} Adding assigned tasks as active: {}", logPrefix, assignedTasks); + log.debug("Adding assigned tasks as active: {}", assignedTasks); for (final Map.Entry<TaskId, Set<TopicPartition>> entry : assignedTasks.entrySet()) { final TaskId taskId = entry.getKey(); final Set<TopicPartition> partitions = entry.getValue(); @@ -106,11 +110,11 @@ class TaskManager { newTasks.put(taskId, partitions); } } catch (final StreamsException e) { - log.error("{} Failed to create an active task {} due to the following error:", logPrefix, taskId, e); + log.error("Failed to create an active task {} due to the following error:", taskId, e); throw e; } } else { - log.warn("{} Task {} owned partitions {} are not contained in the assignment {}", logPrefix, taskId, partitions, assignment); + log.warn("Task {} owned partitions {} are not contained in the assignment {}", taskId, partitions, assignment); } } @@ -120,7 +124,7 @@ class TaskManager { // create all newly assigned tasks (guard against race condition with other thread via backoff and retry) // -> other thread will call removeSuspendedTasks(); eventually - log.trace("{} New active tasks to be created: {}", logPrefix, newTasks); + log.trace("New active tasks to be created: {}", newTasks); for (final Task task : taskCreator.createTasks(consumer, newTasks)) { active.addNewTask(task); @@ -132,7 +136,7 @@ class TaskManager { if (assignedStandbyTasks.isEmpty()) { return; } - log.debug("{} Adding assigned standby tasks {}", logPrefix, assignedStandbyTasks); + log.debug("Adding assigned standby tasks {}", assignedStandbyTasks); final Map<TaskId, Set<TopicPartition>> newStandbyTasks = new HashMap<>(); // collect newly assigned standby tasks and reopen re-assigned standby tasks for (final Map.Entry<TaskId, Set<TopicPartition>> entry : assignedStandbyTasks.entrySet()) { @@ -150,7 +154,7 @@ class TaskManager { // create all newly assigned standby tasks (guard against race condition with other thread via backoff and retry) // -> other thread will call removeSuspendedStandbyTasks(); eventually - log.trace("{} New standby tasks to be created: {}", logPrefix, newStandbyTasks); + log.trace("New standby tasks to be created: {}", newStandbyTasks); for (final Task task : standbyTaskCreator.createTasks(consumer, newStandbyTasks)) { standby.addNewTask(task); @@ -174,8 +178,7 @@ class TaskManager { * soon the tasks will be assigned again */ void suspendTasksAndState() { - log.debug("{} Suspending all active tasks {} and standby tasks {}", - logPrefix, active.runningTaskIds(), standby.runningTaskIds()); + log.debug("Suspending all active tasks {} and standby tasks {}", active.runningTaskIds(), standby.runningTaskIds()); final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null); @@ -185,7 +188,7 @@ class TaskManager { firstException.compareAndSet(null, unAssignChangeLogPartitions()); if (firstException.get() != null) { - throw new StreamsException(logPrefix + " failed to suspend stream tasks", firstException.get()); + throw new StreamsException(logPrefix + "failed to suspend stream tasks", firstException.get()); } } @@ -194,15 +197,14 @@ class TaskManager { // un-assign the change log partitions restoreConsumer.assign(Collections.<TopicPartition>emptyList()); } catch (final RuntimeException e) { - log.error("{} Failed to un-assign change log partitions due to the following error:", logPrefix, e); + log.error("Failed to un-assign change log partitions due to the following error:", e); return e; } return null; } void shutdown(final boolean clean) { - log.debug("{} Shutting down all active tasks {}, standby tasks {}, suspended tasks {}, and suspended standby tasks {}", - logPrefix, active.runningTaskIds(), standby.runningTaskIds(), + log.debug("Shutting down all active tasks {}, standby tasks {}, suspended tasks {}, and suspended standby tasks {}", active.runningTaskIds(), standby.runningTaskIds(), active.previousTaskIds(), standby.previousTaskIds()); active.close(clean); @@ -210,7 +212,7 @@ class TaskManager { try { threadMetadataProvider.close(); } catch (final Throwable e) { - log.error("{} Failed to close KafkaStreamClient due to the following error:", logPrefix, e); + log.error("Failed to close KafkaStreamClient due to the following error:", e); } // remove the changelog partitions from restore consumer unAssignChangeLogPartitions(); @@ -256,7 +258,7 @@ class TaskManager { final Set<TopicPartition> resumed = active.updateRestored(restored); if (!resumed.isEmpty()) { - log.trace("{} resuming partitions {}", logPrefix, resumed); + log.trace("resuming partitions {}", resumed); consumer.resume(resumed); } if (active.allTasksRunning()) { http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java index 1220c02..aab9671 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java @@ -17,11 +17,11 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.processor.internals.RecordContext; import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.Collections; import java.util.HashMap; @@ -37,9 +37,7 @@ import java.util.NoSuchElementException; * @see org.apache.kafka.streams.state.Stores#create(String) */ public class ThreadCache { - private static final Logger log = LoggerFactory.getLogger(ThreadCache.class); - - private final String logPrefix; + private final Logger log; private final long maxCacheSizeBytes; private final StreamsMetrics metrics; private final Map<String, NamedCache> caches = new HashMap<>(); @@ -54,10 +52,10 @@ public class ThreadCache { void apply(final List<DirtyEntry> dirty); } - public ThreadCache(final String logPrefix, long maxCacheSizeBytes, final StreamsMetrics metrics) { - this.logPrefix = logPrefix; + public ThreadCache(final LogContext logContext, long maxCacheSizeBytes, final StreamsMetrics metrics) { this.maxCacheSizeBytes = maxCacheSizeBytes; this.metrics = metrics; + this.log = logContext.logger(getClass()); } public long puts() { @@ -129,8 +127,7 @@ public class ThreadCache { cache.flush(); if (log.isTraceEnabled()) { - log.trace("{} Cache stats on flush: #puts={}, #gets={}, #evicts={}, #flushes={}", - logPrefix, puts(), gets(), evicts(), flushes()); + log.trace("Cache stats on flush: #puts={}, #gets={}, #evicts={}, #flushes={}", puts(), gets(), evicts(), flushes()); } } @@ -250,7 +247,7 @@ public class ThreadCache { numEvicted++; } if (log.isTraceEnabled()) { - log.trace("{} Evicted {} entries from cache {}", logPrefix, numEvicted, namespace); + log.trace("Evicted {} entries from cache {}", numEvicted, namespace); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index 33d55e2..73c5484 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -557,4 +557,5 @@ public class KafkaStreamsTest { mapStates.put(newState, prevCount + 1); } } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java index 7f0d28a..21dc4f0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.kstream.Initializer; @@ -87,7 +88,7 @@ public class KStreamSessionWindowAggregateProcessorTest { public void initializeStore() { final File stateDir = TestUtils.tempDirectory(); context = new MockProcessorContext(stateDir, - Serdes.String(), Serdes.String(), new NoOpRecordCollector(), new ThreadCache("testCache", 100000, new MockStreamsMetrics(new Metrics()))) { + Serdes.String(), Serdes.String(), new NoOpRecordCollector(), new ThreadCache(new LogContext("testCache "), 100000, new MockStreamsMetrics(new Metrics()))) { @Override public <K, V> void forward(final K key, final V value) { results.add(KeyValue.pair(key, value)); http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java index 16e5b39..4dc17c0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.Cancellable; import org.apache.kafka.streams.processor.PunctuationType; @@ -161,7 +162,7 @@ public class AbstractProcessorContextTest { } TestProcessorContext(final MockStreamsMetrics metrics) { - super(new TaskId(0, 0), "appId", new StreamsConfig(config), metrics, new StateManagerStub(), new ThreadCache("name", 0, metrics)); + super(new TaskId(0, 0), "appId", new StreamsConfig(config), metrics, new StateManagerStub(), new ThreadCache(new LogContext("name "), 0, metrics)); } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java index d2d439c..02aa0a0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java @@ -25,6 +25,7 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.AuthorizationException; import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.LockException; import org.apache.kafka.streams.errors.ProcessorStateException; @@ -121,7 +122,7 @@ public class AbstractTaskTest { Collections.<String, String>emptyMap(), Collections.<StateStore>emptyList()), consumer, - new StoreChangelogReader(consumer, new MockStateRestoreListener()), + new StoreChangelogReader(consumer, new MockStateRestoreListener(), new LogContext("stream-task-test ")), false, stateDirectory, config) { http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java index 7f961af..7d6bb3a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java @@ -20,6 +20,7 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.CommitFailedException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.processor.TaskId; import org.easymock.EasyMock; @@ -52,7 +53,7 @@ public class AssignedTasksTest { @Before public void before() { - assignedTasks = new AssignedTasks("log", "task"); + assignedTasks = new AssignedTasks(new LogContext("log "), "task"); EasyMock.expect(t1.id()).andReturn(taskId1).anyTimes(); EasyMock.expect(t2.id()).andReturn(taskId2).anyTimes(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java index c56f609..fd9d070 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; @@ -108,7 +109,7 @@ public class ProcessorNodeTest { final StateSerdes anyStateSerde = StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class); final Metrics metrics = new Metrics(); - final MockProcessorContext context = new MockProcessorContext(anyStateSerde, new RecordCollectorImpl(null, null), metrics); + final MockProcessorContext context = new MockProcessorContext(anyStateSerde, new RecordCollectorImpl(null, null, new LogContext("processnode-test ")), metrics); final ProcessorNode node = new ProcessorNode("name", new NoOpProcessor(), Collections.emptySet()); node.init(context); http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java index fbf45b3..1db2200 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KeyValue; @@ -71,6 +72,7 @@ public class ProcessorStateManagerTest { private final byte[] key = new byte[]{0x0, 0x0, 0x0, 0x1}; private final byte[] value = "the-value".getBytes(Charset.forName("UTF-8")); private final ConsumerRecord<byte[], byte[]> consumerRecord = new ConsumerRecord<>(changelogTopic, 0, 0, key, value); + private final LogContext logContext = new LogContext("process-state-manager-test "); private File baseDir; private File checkpointFile; @@ -146,7 +148,8 @@ public class ProcessorStateManagerTest { } }, changelogReader, - false); + false, + logContext); try { stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback); @@ -172,7 +175,8 @@ public class ProcessorStateManagerTest { } }, changelogReader, - false); + false, + logContext); try { stateMgr.register(nonPersistentStore, true, nonPersistentStore.stateRestoreCallback); @@ -220,7 +224,8 @@ public class ProcessorStateManagerTest { stateDirectory, storeToChangelogTopic, changelogReader, - false); + false, + logContext); try { stateMgr.register(store1, true, store1.stateRestoreCallback); @@ -252,7 +257,8 @@ public class ProcessorStateManagerTest { stateDirectory, Collections.<String, String>emptyMap(), changelogReader, - false); + false, + logContext); try { stateMgr.register(mockStateStore, true, mockStateStore.stateRestoreCallback); @@ -286,7 +292,8 @@ public class ProcessorStateManagerTest { } }, changelogReader, - false); + false, + logContext); try { // make sure the checkpoint file isn't deleted assertTrue(checkpointFile.exists()); @@ -320,7 +327,8 @@ public class ProcessorStateManagerTest { stateDirectory, Collections.<String, String>emptyMap(), changelogReader, - false); + false, + logContext); stateMgr.register(nonPersistentStore, false, nonPersistentStore.stateRestoreCallback); assertNotNull(stateMgr.getStore(nonPersistentStoreName)); } @@ -338,7 +346,8 @@ public class ProcessorStateManagerTest { stateDirectory, Collections.<String, String>emptyMap(), changelogReader, - false); + false, + logContext); stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback); stateMgr.close(null); final Map<TopicPartition, Long> read = checkpoint.read(); @@ -354,7 +363,8 @@ public class ProcessorStateManagerTest { stateDirectory, Collections.singletonMap(persistentStore.name(), persistentStoreTopicName), changelogReader, - false); + false, + logContext); stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback); stateMgr.checkpoint(Collections.singletonMap(persistentStorePartition, 10L)); @@ -371,7 +381,8 @@ public class ProcessorStateManagerTest { stateDirectory, Collections.singletonMap(persistentStore.name(), persistentStoreTopicName), changelogReader, - false); + false, + logContext); stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback); final byte[] bytes = Serdes.Integer().serializer().serialize("", 10); @@ -401,7 +412,8 @@ public class ProcessorStateManagerTest { stateDirectory, Collections.singletonMap(nonPersistentStoreName, nonPersistentStoreTopicName), changelogReader, - false); + false, + logContext); stateMgr.register(nonPersistentStore, true, nonPersistentStore.stateRestoreCallback); stateMgr.checkpoint(Collections.singletonMap(topicPartition, 876L)); @@ -419,7 +431,8 @@ public class ProcessorStateManagerTest { stateDirectory, Collections.<String, String>emptyMap(), changelogReader, - false); + false, + logContext); stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback); @@ -439,7 +452,8 @@ public class ProcessorStateManagerTest { stateDirectory, Collections.<String, String>emptyMap(), changelogReader, - false); + false, + logContext); try { stateManager.register(new MockStateStoreSupplier.MockStateStore(ProcessorStateManager.CHECKPOINT_FILE_NAME, true), true, null); @@ -458,7 +472,9 @@ public class ProcessorStateManagerTest { stateDirectory, Collections.<String, String>emptyMap(), changelogReader, - false); + false, + logContext); + stateManager.register(mockStateStore, false, null); try { @@ -480,7 +496,8 @@ public class ProcessorStateManagerTest { stateDirectory, Collections.singletonMap(storeName, changelogTopic), changelogReader, - false); + false, + logContext); final MockStateStoreSupplier.MockStateStore stateStore = new MockStateStoreSupplier.MockStateStore(storeName, true) { @Override @@ -512,7 +529,8 @@ public class ProcessorStateManagerTest { stateDirectory, Collections.<String, String>emptyMap(), changelogReader, - true); + true, + logContext); assertFalse(checkpointFile.exists()); } finally { @@ -534,7 +552,8 @@ public class ProcessorStateManagerTest { } }, changelogReader, - false); + false, + logContext); } private MockStateStoreSupplier.MockStateStore getPersistentStore() { http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java index 09f46e0..7b2a41e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java @@ -28,6 +28,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.StreamPartitioner; import org.junit.Test; @@ -43,6 +44,8 @@ import static org.junit.Assert.assertEquals; public class RecordCollectorTest { + private final LogContext logContext = new LogContext("test "); + private final List<PartitionInfo> infos = Arrays.asList( new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]), @@ -68,7 +71,7 @@ public class RecordCollectorTest { final RecordCollectorImpl collector = new RecordCollectorImpl( new MockProducer<>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer), - "RecordCollectorTest-TestSpecificPartition"); + "RecordCollectorTest-TestSpecificPartition", new LogContext("RecordCollectorTest-TestSpecificPartition ")); collector.send("topic1", "999", "0", 0, null, stringSerializer, stringSerializer); collector.send("topic1", "999", "0", 0, null, stringSerializer, stringSerializer); @@ -100,7 +103,7 @@ public class RecordCollectorTest { final RecordCollectorImpl collector = new RecordCollectorImpl( new MockProducer<>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer), - "RecordCollectorTest-TestStreamPartitioner"); + "RecordCollectorTest-TestStreamPartitioner", new LogContext("RecordCollectorTest-TestStreamPartitioner ")); collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner); collector.send("topic1", "9", "0", null, stringSerializer, stringSerializer, streamPartitioner); @@ -135,7 +138,8 @@ public class RecordCollectorTest { return super.send(record, callback); } }, - "test"); + "test", + logContext); collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner); final Long offset = collector.offsets().get(new TopicPartition("topic1", 0)); @@ -152,7 +156,8 @@ public class RecordCollectorTest { throw new TimeoutException(); } }, - "test"); + "test", + logContext); collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner); @@ -169,7 +174,8 @@ public class RecordCollectorTest { return null; } }, - "test"); + "test", + logContext); collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner); collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner); } @@ -185,7 +191,8 @@ public class RecordCollectorTest { return null; } }, - "test"); + "test", + logContext); collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner); collector.flush(); } @@ -201,7 +208,8 @@ public class RecordCollectorTest { return null; } }, - "test"); + "test", + logContext); collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner); collector.close(); } @@ -217,7 +225,8 @@ public class RecordCollectorTest { } }, - "test"); + "test", + logContext); collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java index c6e9eac..c33a9c4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java @@ -28,6 +28,7 @@ import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler; import org.apache.kafka.streams.errors.LogAndFailExceptionHandler; import org.apache.kafka.streams.errors.StreamsException; @@ -53,7 +54,7 @@ public class RecordQueueTest { private final String[] topics = {"topic"}; final MockProcessorContext context = new MockProcessorContext(StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class), - new RecordCollectorImpl(null, null)); + new RecordCollectorImpl(null, null, new LogContext("record-queue-test "))); private final MockSourceNode mockSourceNodeWithMetrics = new MockSourceNode<>(topics, intDeserializer, intDeserializer); private final RecordQueue queue = new RecordQueue(new TopicPartition(topics[0], 1), mockSourceNodeWithMetrics, http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java index 9da341b..ef99b8a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java @@ -20,6 +20,7 @@ import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.test.MockProcessorContext; @@ -36,7 +37,7 @@ public class SinkNodeTest { private final Serializer anySerializer = Serdes.Bytes().serializer(); private final StateSerdes anyStateSerde = StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class); private final MockProcessorContext context = new MockProcessorContext(anyStateSerde, - new RecordCollectorImpl(new MockProducer<byte[], byte[]>(true, anySerializer, anySerializer), null)); + new RecordCollectorImpl(new MockProducer<byte[], byte[]>(true, anySerializer, anySerializer), null, new LogContext("sinknode-test "))); private final SinkNode sink = new SinkNode<>("anyNodeName", "any-output-topic", anySerializer, anySerializer, null); @Before http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index fd63bd6..7a7b119 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -28,6 +28,7 @@ import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; @@ -133,7 +134,7 @@ public class StandbyTaskTest { private final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); private final MockRestoreConsumer restoreStateConsumer = new MockRestoreConsumer(); - private final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreStateConsumer, stateRestoreListener); + private final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreStateConsumer, stateRestoreListener, new LogContext("standby-task-test ")); private final byte[] recordValue = intSerializer.serialize(null, 10); private final byte[] recordKey = intSerializer.serialize(null, 1); http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java index 8024cd9..2bb5b7b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java @@ -20,6 +20,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Utils; import org.junit.Before; @@ -42,6 +43,7 @@ public class StateConsumerTest { private final MockTime time = new MockTime(); private final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); private final Map<TopicPartition, Long> partitionOffsets = new HashMap<>(); + private final LogContext logContext = new LogContext("test "); private GlobalStreamThread.StateConsumer stateConsumer; private StateMaintainerStub stateMaintainer; @@ -50,7 +52,7 @@ public class StateConsumerTest { partitionOffsets.put(topicOne, 20L); partitionOffsets.put(topicTwo, 30L); stateMaintainer = new StateMaintainerStub(partitionOffsets); - stateConsumer = new GlobalStreamThread.StateConsumer("test", consumer, stateMaintainer, time, 10L, FLUSH_INTERVAL); + stateConsumer = new GlobalStreamThread.StateConsumer(logContext, consumer, stateMaintainer, time, 10L, FLUSH_INTERVAL); } @Test @@ -107,7 +109,7 @@ public class StateConsumerTest { @Test public void shouldNotFlushWhenFlushIntervalIsZero() { - stateConsumer = new GlobalStreamThread.StateConsumer("test", consumer, stateMaintainer, time, 10L, -1); + stateConsumer = new GlobalStreamThread.StateConsumer(logContext, consumer, stateMaintainer, time, 10L, -1); stateConsumer.initialize(); time.sleep(100); stateConsumer.pollAndUpdate(); http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java index c574bbc..5a3fa69 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.StateRestoreListener; @@ -51,8 +52,9 @@ public class StoreChangelogReaderTest { private final CompositeRestoreListener restoreListener = new CompositeRestoreListener(callback); private final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); private final StateRestoreListener stateRestoreListener = new MockStateRestoreListener(); - private final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, stateRestoreListener); private final TopicPartition topicPartition = new TopicPartition("topic", 0); + private final LogContext logContext = new LogContext("test-reader "); + private final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, stateRestoreListener, logContext); @Before public void setUp() { @@ -70,7 +72,7 @@ public class StoreChangelogReaderTest { } }; - final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, stateRestoreListener); + final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, stateRestoreListener, logContext); changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName")); changelogReader.restore(); http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 2b719d1..91dd422 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -33,6 +33,7 @@ import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; @@ -114,7 +115,7 @@ public class StreamTaskTest { private final MockProducer<byte[], byte[]> producer = new MockProducer<>(false, bytesSerializer, bytesSerializer); private final MockConsumer<byte[], byte[]> restoreStateConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); private final StateRestoreListener stateRestoreListener = new MockStateRestoreListener(); - private final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreStateConsumer, stateRestoreListener); + private final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreStateConsumer, stateRestoreListener, new LogContext("stream-task-test ")); private final byte[] recordValue = intSerializer.serialize(null, 10); private final byte[] recordKey = intSerializer.serialize(null, 1); private final String applicationId = "applicationId"; @@ -124,7 +125,7 @@ public class StreamTaskTest { private final MockTime time = new MockTime(); private File baseDir = TestUtils.tempDirectory(); private StateDirectory stateDirectory; - private final RecordCollectorImpl recordCollector = new RecordCollectorImpl(producer, "taskId"); + private final RecordCollectorImpl recordCollector = new RecordCollectorImpl(producer, "taskId", new LogContext("taskId ")); private StreamsConfig config; private StreamsConfig eosConfig; private StreamTask task; @@ -551,7 +552,7 @@ public class StreamTaskTest { changelogReader, config, streamsMetrics, stateDirectory, null, time, producer) { @Override - RecordCollector createRecordCollector() { + RecordCollector createRecordCollector(final LogContext logContext) { return new NoOpRecordCollector() { @Override public void flush() { @@ -605,7 +606,7 @@ public class StreamTaskTest { changelogReader, config, streamsMetrics, stateDirectory, null, time, producer) { @Override - RecordCollector createRecordCollector() { + RecordCollector createRecordCollector(final LogContext logContext) { return new NoOpRecordCollector() { @Override public Map<TopicPartition, Long> offsets() { @@ -672,7 +673,7 @@ public class StreamTaskTest { changelogReader, testConfig, streamsMetrics, stateDirectory, null, time, producer) { @Override - RecordCollector createRecordCollector() { + RecordCollector createRecordCollector(final LogContext logContext) { return new NoOpRecordCollector() { @Override public Map<TopicPartition, Long> offsets() { http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java index c6d12c6..bf433da 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.ProcessorContext; @@ -184,7 +185,7 @@ public class KeyValueStoreTestDriver<K, V> { final ByteArraySerializer rawSerializer = new ByteArraySerializer(); final Producer<byte[], byte[]> producer = new MockProducer<>(true, rawSerializer, rawSerializer); - final RecordCollector recordCollector = new RecordCollectorImpl(producer, "KeyValueStoreTestDriver") { + final RecordCollector recordCollector = new RecordCollectorImpl(producer, "KeyValueStoreTestDriver", new LogContext("KeyValueStoreTestDriver ")) { @Override public <K1, V1> void send(final String topic, final K1 key, @@ -226,7 +227,7 @@ public class KeyValueStoreTestDriver<K, V> { props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, RocksDBKeyValueStoreTest.TheRocksDbConfigSetter.class); context = new MockProcessorContext(stateDir, serdes.keySerde(), serdes.valueSerde(), recordCollector, null) { - ThreadCache cache = new ThreadCache("testCache", 1024 * 1024L, metrics()); + ThreadCache cache = new ThreadCache(new LogContext("testCache "), 1024 * 1024L, metrics()); @Override public ThreadCache getCache() { http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java index a4a4bd7..73cdb25 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.kstream.internals.CacheFlushListener; @@ -69,7 +70,7 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest { cacheFlushListener = new CacheFlushListenerStub<>(); store = new CachingKeyValueStore<>(underlyingStore, Serdes.String(), Serdes.String()); store.setFlushListener(cacheFlushListener); - cache = new ThreadCache("testCache", maxCacheSizeBytes, new MockStreamsMetrics(new Metrics())); + cache = new ThreadCache(new LogContext("testCache "), maxCacheSizeBytes, new MockStreamsMetrics(new Metrics())); context = new MockProcessorContext(null, null, null, (RecordCollector) null, cache); topic = "topic"; context.setRecordContext(new ProcessorRecordContext(10, 0, 0, topic)); http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java index 14ac52c..db19294 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.kstream.Windowed; @@ -74,7 +75,7 @@ public class CachingSessionStoreTest { Serdes.String(), Segments.segmentInterval(retention, numSegments) ); - cache = new ThreadCache("testCache", MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics())); + cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics())); context = new MockProcessorContext(TestUtils.tempDirectory(), null, null, (RecordCollector) null, cache); context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, "topic")); cachingStore.init(context, cachingStore); http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java index 2621927..ea2c47e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.InvalidStateStoreException; @@ -77,7 +78,7 @@ public class CachingWindowStoreTest { WINDOW_SIZE, Segments.segmentInterval(retention, numSegments)); cachingStore.setFlushListener(cacheListener); - cache = new ThreadCache("testCache", MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics())); + cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics())); topic = "topic"; context = new MockProcessorContext(TestUtils.tempDirectory(), null, null, (RecordCollector) null, cache); context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, topic)); http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java index 613dbf6..cf07927 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.test.MockProcessorContext; @@ -68,7 +69,7 @@ public class ChangeLoggingKeyValueBytesStoreTest { Serdes.String(), Serdes.Long(), collector, - new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics()))); + new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics()))); context.setTime(0); store.init(context, store); } http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStoreTest.java index 6f502c6..8190fd2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStoreTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.state.KeyValueIterator; @@ -76,7 +77,7 @@ public class ChangeLoggingKeyValueStoreTest { Serdes.String(), Serdes.Long(), collector, - new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics()))); + new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics()))); context.setTime(0); store.init(context, store); } http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIteratorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIteratorTest.java index 3e579bc..edc0b94 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIteratorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIteratorTest.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; @@ -39,7 +40,7 @@ public class MergedSortedCacheKeyValueBytesStoreIteratorTest { @Before public void setUp() throws Exception { store = new InMemoryKeyValueStore<>(namespace, Serdes.Bytes(), Serdes.ByteArray()); - cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new Metrics())); + cache = new ThreadCache(new LogContext("testCache "), 10000L, new MockStreamsMetrics(new Metrics())); } @Test @@ -146,7 +147,7 @@ public class MergedSortedCacheKeyValueBytesStoreIteratorTest { @Test public void shouldPeekNextKey() throws Exception { final KeyValueStore<Bytes, byte[]> kv = new InMemoryKeyValueStore<>("one", Serdes.Bytes(), Serdes.ByteArray()); - final ThreadCache cache = new ThreadCache("testCache", 1000000L, new MockStreamsMetrics(new Metrics())); + final ThreadCache cache = new ThreadCache(new LogContext("testCache "), 1000000L, new MockStreamsMetrics(new Metrics())); byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}}; for (int i = 0; i < bytes.length - 1; i += 2) { kv.put(Bytes.wrap(bytes[i]), bytes[i]);
