This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4a3e92b1abb KAFKA-10199: Expose read only task from state updater 
(#12497)
4a3e92b1abb is described below

commit 4a3e92b1abb562a27b6a461b3dfedde2d15fd3b2
Author: Bruno Cadonna <[email protected]>
AuthorDate: Fri Aug 12 17:03:50 2022 +0200

    KAFKA-10199: Expose read only task from state updater (#12497)
    
    The state updater exposes tasks that are in restoration
    to the stream thread. To ensure that the stream thread
    only accesses the tasks to read from the tasks without
    modifying any internal state, this PR introduces a
    read-only task that throws an exception if the caller
    tries to modify the internal state of a task.
    
    This PR also returns read-only tasks from
    DefaultStateUpdater#getTasks().
    
    Reviewer: Guozhang Wang <[email protected]>
---
 .../processor/internals/DefaultStateUpdater.java   |   2 +-
 .../processor/internals/ProcessorStateManager.java |  14 +-
 .../streams/processor/internals/ReadOnlyTask.java  | 220 +++++++++++++++++++++
 .../internals/DefaultStateUpdaterTest.java         |  36 +---
 .../processor/internals/ReadOnlyTaskTest.java      | 193 ++++++++++++++++++
 5 files changed, 430 insertions(+), 35 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
index 7e7ec2a6f78..803b96bf2ce 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
@@ -562,7 +562,7 @@ public class DefaultStateUpdater implements StateUpdater {
 
     @Override
     public Set<Task> getTasks() {
-        return executeWithQueuesLocked(() -> 
getStreamOfTasks().collect(Collectors.toSet()));
+        return executeWithQueuesLocked(() -> 
getStreamOfTasks().map(ReadOnlyTask::new).collect(Collectors.toSet()));
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 2a67a568a21..d80a1edd400 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -43,6 +43,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -366,20 +367,21 @@ public class ProcessorStateManager implements 
StateManager {
     }
 
     Set<TopicPartition> changelogPartitions() {
-        return changelogOffsets().keySet();
+        return Collections.unmodifiableSet(changelogOffsets().keySet());
     }
 
     void markChangelogAsCorrupted(final Collection<TopicPartition> partitions) 
{
+        final Collection<TopicPartition> partitionsToMarkAsCorrupted = new 
LinkedList<>(partitions);
         for (final StateStoreMetadata storeMetadata : stores.values()) {
-            if (partitions.contains(storeMetadata.changelogPartition)) {
+            if 
(partitionsToMarkAsCorrupted.contains(storeMetadata.changelogPartition)) {
                 storeMetadata.corrupted = true;
-                partitions.remove(storeMetadata.changelogPartition);
+                
partitionsToMarkAsCorrupted.remove(storeMetadata.changelogPartition);
             }
         }
 
-        if (!partitions.isEmpty()) {
-            throw new IllegalStateException("Some partitions " + partitions + 
" are not contained in the store list of task " +
-                taskId + " marking as corrupted, this is not expected");
+        if (!partitionsToMarkAsCorrupted.isEmpty()) {
+            throw new IllegalStateException("Some partitions " + 
partitionsToMarkAsCorrupted + " are not contained in " +
+                "the store list of task " + taskId + " marking as corrupted, 
this is not expected");
         }
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java
new file mode 100644
index 00000000000..00c1006b85e
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Consumer;
+
+public class ReadOnlyTask implements Task {
+
+    private final Task task;
+
+    public ReadOnlyTask(final Task task) {
+        this.task = task;
+    }
+
+    @Override
+    public TaskId id() {
+        return task.id();
+    }
+
+    @Override
+    public boolean isActive() {
+        return task.isActive();
+    }
+
+    @Override
+    public Set<TopicPartition> inputPartitions() {
+        return task.inputPartitions();
+    }
+
+    @Override
+    public Set<TopicPartition> changelogPartitions() {
+        return task.changelogPartitions();
+    }
+
+    @Override
+    public State state() {
+        return task.state();
+    }
+
+    @Override
+    public boolean commitRequested() {
+        return task.commitRequested();
+    }
+
+    @Override
+    public boolean needsInitializationOrRestoration() {
+        return task.needsInitializationOrRestoration();
+    }
+
+    @Override
+    public void initializeIfNeeded() {
+        throw new UnsupportedOperationException("This task is read-only");
+    }
+
+    @Override
+    public void addPartitionsForOffsetReset(final Set<TopicPartition> 
partitionsForOffsetReset) {
+        throw new UnsupportedOperationException("This task is read-only");
+    }
+
+    @Override
+    public void completeRestoration(final Consumer<Set<TopicPartition>> 
offsetResetter) {
+        throw new UnsupportedOperationException("This task is read-only");
+    }
+
+    @Override
+    public void suspend() {
+        throw new UnsupportedOperationException("This task is read-only");
+    }
+
+    @Override
+    public void resume() {
+        throw new UnsupportedOperationException("This task is read-only");
+    }
+
+    @Override
+    public void closeDirty() {
+        throw new UnsupportedOperationException("This task is read-only");
+    }
+
+    @Override
+    public void closeClean() {
+        throw new UnsupportedOperationException("This task is read-only");
+    }
+
+    @Override
+    public void updateInputPartitions(final Set<TopicPartition> 
topicPartitions,
+                                      final Map<String, List<String>> 
allTopologyNodesToSourceTopics) {
+        throw new UnsupportedOperationException("This task is read-only");
+    }
+
+    @Override
+    public void maybeCheckpoint(final boolean enforceCheckpoint) {
+        throw new UnsupportedOperationException("This task is read-only");
+    }
+
+    @Override
+    public void markChangelogAsCorrupted(final Collection<TopicPartition> 
partitions) {
+        throw new UnsupportedOperationException("This task is read-only");
+    }
+
+    @Override
+    public void revive() {
+        throw new UnsupportedOperationException("This task is read-only");
+    }
+
+    @Override
+    public void prepareRecycle() {
+        throw new UnsupportedOperationException("This task is read-only");
+    }
+
+    @Override
+    public void addRecords(final TopicPartition partition, final 
Iterable<ConsumerRecord<byte[], byte[]>> records) {
+        throw new UnsupportedOperationException("This task is read-only");
+    }
+
+    @Override
+    public boolean process(final long wallClockTime) {
+        throw new UnsupportedOperationException("This task is read-only");
+    }
+
+    @Override
+    public void recordProcessBatchTime(final long processBatchTime) {
+        throw new UnsupportedOperationException("This task is read-only");
+    }
+
+    @Override
+    public void recordProcessTimeRatioAndBufferSize(final long 
allTaskProcessMs, final long now) {
+        throw new UnsupportedOperationException("This task is read-only");
+    }
+
+    @Override
+    public boolean maybePunctuateStreamTime() {
+        throw new UnsupportedOperationException("This task is read-only");
+    }
+
+    @Override
+    public boolean maybePunctuateSystemTime() {
+        throw new UnsupportedOperationException("This task is read-only");
+    }
+
+    @Override
+    public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
+        throw new UnsupportedOperationException("This task is read-only");
+    }
+
+    @Override
+    public void postCommit(final boolean enforceCheckpoint) {
+        throw new UnsupportedOperationException("This task is read-only");
+    }
+
+    @Override
+    public Map<TopicPartition, Long> purgeableOffsets() {
+        throw new UnsupportedOperationException("This task is read-only");
+    }
+
+    @Override
+    public void maybeInitTaskTimeoutOrThrow(final long currentWallClockMs, 
final Exception cause) {
+        throw new UnsupportedOperationException("This task is read-only");
+    }
+
+    @Override
+    public void clearTaskTimeout() {
+        throw new UnsupportedOperationException("This task is read-only");
+    }
+
+    @Override
+    public boolean commitNeeded() {
+        throw new UnsupportedOperationException("This task is read-only");
+    }
+
+    @Override
+    public StateStore getStore(final String name) {
+        throw new UnsupportedOperationException("This task is read-only");
+    }
+
+    @Override
+    public Map<TopicPartition, Long> changelogOffsets() {
+        throw new UnsupportedOperationException("This task is read-only");
+    }
+
+    @Override
+    public Map<TopicPartition, Long> committedOffsets() {
+        throw new UnsupportedOperationException("This task is read-only");
+    }
+
+    @Override
+    public Map<TopicPartition, Long> highWaterMark() {
+        throw new UnsupportedOperationException("This task is read-only");
+    }
+
+    @Override
+    public Optional<Long> timeCurrentIdlingStarted() {
+        throw new UnsupportedOperationException("This task is read-only");
+    }
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
index adc417ede6f..e5718f53077 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
@@ -39,6 +39,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
@@ -1118,20 +1119,7 @@ class DefaultStateUpdaterTest {
         stateUpdater.add(activeTask2);
         stateUpdater.add(standbyTask3);
 
-        final Set<Task> tasks = stateUpdater.getTasks();
-
-        assertEquals(5, tasks.size());
-        assertTrue(tasks.containsAll(mkSet(activeTask1, activeTask2, 
standbyTask1, standbyTask2, standbyTask3)));
-
-        final Set<StreamTask> activeTasks = stateUpdater.getActiveTasks();
-
-        assertEquals(2, activeTasks.size());
-        assertTrue(activeTasks.containsAll(mkSet(activeTask1, activeTask2)));
-
-        final Set<StandbyTask> standbyTasks = stateUpdater.getStandbyTasks();
-
-        assertEquals(3, standbyTasks.size());
-        assertTrue(standbyTasks.containsAll(mkSet(standbyTask1, standbyTask2, 
standbyTask3)));
+        verifyGetTasks(mkSet(activeTask1, activeTask2), mkSet(standbyTask1, 
standbyTask2, standbyTask3));
     }
 
     @Test
@@ -1153,18 +1141,7 @@ class DefaultStateUpdaterTest {
 
         final Set<Task> tasks = stateUpdater.getTasks();
 
-        assertEquals(5, tasks.size());
-        assertTrue(tasks.containsAll(mkSet(activeTask1, activeTask2, 
standbyTask1, standbyTask2, standbyTask3)));
-
-        final Set<StreamTask> activeTasks = stateUpdater.getActiveTasks();
-
-        assertEquals(2, activeTasks.size());
-        assertTrue(activeTasks.containsAll(mkSet(activeTask1, activeTask2)));
-
-        final Set<StandbyTask> standbyTasks = stateUpdater.getStandbyTasks();
-
-        assertEquals(3, standbyTasks.size());
-        assertTrue(standbyTasks.containsAll(mkSet(standbyTask1, standbyTask2, 
standbyTask3)));
+        verifyGetTasks(mkSet(activeTask1, activeTask2), mkSet(standbyTask1, 
standbyTask2, standbyTask3));
     }
 
     @Test
@@ -1262,10 +1239,13 @@ class DefaultStateUpdaterTest {
                                 final Set<StandbyTask> expectedStandbyTasks) {
         final Set<Task> tasks = stateUpdater.getTasks();
 
+        assertEquals(expectedActiveTasks.size() + expectedStandbyTasks.size(), 
tasks.size());
+        tasks.forEach(task -> assertTrue(task instanceof ReadOnlyTask));
+        final Set<TaskId> actualTaskIds = 
tasks.stream().map(Task::id).collect(Collectors.toSet());
         final Set<Task> expectedTasks = new HashSet<>(expectedActiveTasks);
         expectedTasks.addAll(expectedStandbyTasks);
-        assertEquals(expectedActiveTasks.size() + expectedStandbyTasks.size(), 
tasks.size());
-        assertTrue(tasks.containsAll(expectedTasks));
+        final Set<TaskId> expectedTaskIds = 
expectedTasks.stream().map(Task::id).collect(Collectors.toSet());
+        assertTrue(actualTaskIds.containsAll(expectedTaskIds));
 
         final Set<StreamTask> activeTasks = stateUpdater.getActiveTasks();
         assertEquals(expectedActiveTasks.size(), activeTasks.size());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ReadOnlyTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ReadOnlyTaskTest.java
new file mode 100644
index 00000000000..cd5da873981
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ReadOnlyTaskTest.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.processor.TaskId;
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.function.Consumer;
+
+import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.statelessTask;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.verify;
+
+class ReadOnlyTaskTest {
+
+    private final List<String> readOnlyMethods = new LinkedList<String>() {
+        {
+            add("needsInitializationOrRestoration");
+            add("inputPartitions");
+            add("changelogPartitions");
+            add("commitRequested");
+            add("isActive");
+            add("state");
+            add("id");
+        }
+    };
+
+    private final List<String> objectMethods = new LinkedList<String>() {
+        {
+            add("wait");
+            add("equals");
+            add("getClass");
+            add("hashCode");
+            add("notify");
+            add("notifyAll");
+            add("toString");
+        }
+    };
+
+    final Task task = statelessTask(new TaskId(1, 0)).build();
+
+    @Test
+    public void shouldDelegateNeedsInitializationOrRestoration() {
+        final ReadOnlyTask readOnlyTask = new ReadOnlyTask(task);
+
+        readOnlyTask.needsInitializationOrRestoration();
+
+        verify(task).needsInitializationOrRestoration();
+    }
+
+    @Test
+    public void shouldDelegateId() {
+        final ReadOnlyTask readOnlyTask = new ReadOnlyTask(task);
+
+        readOnlyTask.id();
+
+        verify(task).id();
+    }
+
+    @Test
+    public void shouldDelegateIsActive() {
+        final ReadOnlyTask readOnlyTask = new ReadOnlyTask(task);
+
+        readOnlyTask.isActive();
+
+        verify(task).isActive();
+    }
+
+    @Test
+    public void shouldDelegateInputPartitions() {
+        final ReadOnlyTask readOnlyTask = new ReadOnlyTask(task);
+
+        readOnlyTask.inputPartitions();
+
+        verify(task).inputPartitions();
+    }
+
+    @Test
+    public void shouldDelegateChangelogPartitions() {
+        final ReadOnlyTask readOnlyTask = new ReadOnlyTask(task);
+
+        readOnlyTask.changelogPartitions();
+
+        verify(task).changelogPartitions();
+    }
+
+    @Test
+    public void shouldDelegateCommitRequested() {
+        final ReadOnlyTask readOnlyTask = new ReadOnlyTask(task);
+
+        readOnlyTask.commitRequested();
+
+        verify(task).commitRequested();
+    }
+
+    @Test
+    public void shouldDelegateState() {
+        final ReadOnlyTask readOnlyTask = new ReadOnlyTask(task);
+
+        readOnlyTask.state();
+
+        verify(task).state();
+    }
+
+    @Test
+    public void shouldThrowUnsupportedOperationExceptionForForbiddenMethods() {
+        final ReadOnlyTask readOnlyTask = new ReadOnlyTask(task);
+        for (final Method method : ReadOnlyTask.class.getMethods()) {
+            final String methodName = method.getName();
+            if (!readOnlyMethods.contains(methodName) && 
!objectMethods.contains(methodName)) {
+                shouldThrowUnsupportedOperationException(readOnlyTask, method);
+            }
+        }
+
+    }
+
+    private void shouldThrowUnsupportedOperationException(final ReadOnlyTask 
readOnlyTask,
+                                                          final Method method) 
{
+        final Exception exception = assertThrows(
+            UnsupportedOperationException.class,
+            () -> {
+                try {
+                    method.invoke(readOnlyTask, 
getParameters(method.getParameterTypes()));
+                } catch (final InvocationTargetException 
invocationTargetException) {
+                    throw invocationTargetException.getCause();
+                }
+            },
+            "Something unexpected happened during invocation of method '" + 
method.getName() + "'!"
+        );
+        assertEquals("This task is read-only", exception.getMessage());
+    }
+
+    private Object[] getParameters(final Class<?>[] parameterTypes) throws 
Exception {
+        final Object[] parameters = new Object[parameterTypes.length];
+
+        for (int i = 0; i < parameterTypes.length; ++i) {
+            switch (parameterTypes[i].getName()) {
+                case "boolean":
+                    parameters[i] = true;
+                    break;
+                case "long":
+                    parameters[i] = 0;
+                    break;
+                case "java.util.Set":
+                    parameters[i] = Collections.emptySet();
+                    break;
+                case "java.util.Collection":
+                    parameters[i] = Collections.emptySet();
+                    break;
+                case "java.util.Map":
+                    parameters[i] = Collections.emptyMap();
+                    break;
+                case "org.apache.kafka.common.TopicPartition":
+                    parameters[i] = new TopicPartition("topic", 0);
+                    break;
+                case "java.lang.Exception":
+                    parameters[i] = new IllegalStateException();
+                    break;
+                case "java.util.function.Consumer":
+                    parameters[i] = (Consumer) ignored -> { };
+                    break;
+                case "java.lang.Iterable":
+                    parameters[i] = Collections.emptySet();
+                    break;
+                default:
+                    parameters[i] = 
parameterTypes[i].getConstructor().newInstance();
+            }
+        }
+
+        return parameters;
+    }
+}
\ No newline at end of file

Reply via email to