lucasbru commented on code in PR #12795:
URL: https://github.com/apache/kafka/pull/12795#discussion_r1018951885
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -250,30 +261,31 @@ private void removeUpdatingAndPausedTasks() {
pausedTasks.clear();
}
- private List<TaskAndAction> getTasksAndActions() {
- final List<TaskAndAction> tasksAndActionsToProcess = new
ArrayList<>(tasksAndActions);
- tasksAndActions.clear();
- return tasksAndActionsToProcess;
- }
-
- private void addTask(final Task task) {
- if (isStateless(task)) {
- addToRestoredTasks((StreamTask) task);
- log.info("Stateless active task " + task.id() + " was added to
the restored tasks of the state updater");
- } else {
- final Task existingTask = updatingTasks.putIfAbsent(task.id(),
task);
- if (existingTask != null) {
- throw new IllegalStateException((existingTask.isActive() ?
"Active" : "Standby") + " task " + task.id() + " already exist, " +
- "should not try to add another " + (task.isActive() ?
"active" : "standby") + " task with the same id. " + BUG_ERROR_MESSAGE);
+ private void initializeTask(final Task task) {
+ boolean initializationFailed = false;
+ if (task.state() == Task.State.CREATED) {
+ try {
+ task.initializeIfNeeded();
+ } catch (final StreamsException streamsException) {
+
addToExceptionsAndFailedTasksThenRemoveFromUpdatingTasks(new
ExceptionAndTasks(Collections.singleton(task), streamsException));
+ initializationFailed = true;
}
- changelogReader.register(task.changelogPartitions(),
task.stateManager());
- if (task.isActive()) {
- log.info("Stateful active task " + task.id() + " was added
to the state updater");
- changelogReader.enforceRestoreActive();
+ }
+ if (!initializationFailed) {
Review Comment:
Good idea, done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]