This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new bbe170a  MINOR: deprecate TaskMetadata constructor and add KIP-740 
notes to upgrade guide (#10755)
bbe170a is described below

commit bbe170af701609180387ad4abbfaa2712936266d
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Wed May 26 10:35:12 2021 -0700

    MINOR: deprecate TaskMetadata constructor and add KIP-740 notes to upgrade 
guide (#10755)
    
    Quick followup to KIP-740 to actually deprecate this constructor, and 
update the upgrade guide with what we changed in KIP-740. I also noticed the 
TaskId#parse method had been modified previously, and should be re-added to the 
public TaskId class. It had no tests, so now it does
    
    Reviewers: Matthias J. Sax <[email protected]>, Luke Chen 
<[email protected]>
---
 docs/streams/upgrade-guide.html                    |  8 ++++++
 .../org/apache/kafka/streams/processor/TaskId.java | 32 ++++++++++++++++++++++
 .../kafka/streams/processor/TaskMetadata.java      | 13 +++++++++
 .../processor/internals/StateDirectoryTest.java    | 12 ++++++++
 4 files changed, 65 insertions(+)

diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index 3ae4573..892d025 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -117,6 +117,14 @@
     <p>
         We removed the default implementation of 
<code>RocksDBConfigSetter#close()</code>.
     </p>
+
+    <p>
+        The public <code>topicGroupId</code> and <code>partition</code> fields 
on TaskId have been deprecated and replaced with getters. Please migrate to 
using the new <code>TaskId.subtopology()</code>
+        (which replaces <code>topicGroupId</code>) and 
<code>TaskId.partition()</code> APIs instead. Also, the 
<code>TaskId#readFrom</code> and <code>TaskId#writeTo</code> methods have been 
deprecated
+        and will be removed, as they were never intended for public use. 
Finally, we have deprecated the <code>TaskMetadata.taskId()</code> method as 
well as the <code>TaskMetadata</code> constructor.
+        These have been replaced with APIs that better represent the task id 
as an actual <code>TaskId</code> object instead of a String. Please migrate to 
the new <code>TaskMetadata#getTaskId</code>
+        method. See <a 
href="https://cwiki.apache.org/confluence/x/vYTOCg";>KIP-740</a> for more 
details.
+    </p>
     <p>
         We removed the following deprecated APIs:
     </p>
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java 
b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
index d536302..f4d8349 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
@@ -16,11 +16,14 @@
  */
 package org.apache.kafka.streams.processor;
 
+import org.apache.kafka.streams.errors.TaskIdFormatException;
+
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Objects;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -81,6 +84,35 @@ public class TaskId implements Comparable<TaskId> {
     }
 
     /**
+     * @throws TaskIdFormatException if the taskIdStr is not a valid {@link 
TaskId}
+     */
+    public static TaskId parse(final String taskIdStr) {
+        final int firstIndex = taskIdStr.indexOf('_');
+        final int secondIndex = taskIdStr.indexOf('_', firstIndex + 1);
+        if (firstIndex <= 0 || firstIndex + 1 >= taskIdStr.length()) {
+            throw new TaskIdFormatException(taskIdStr);
+        }
+
+        try {
+            // If only one copy of '_' exists, there is no named topology in 
the string
+            if (secondIndex < 0) {
+                final int topicGroupId = 
Integer.parseInt(taskIdStr.substring(0, firstIndex));
+                final int partition = 
Integer.parseInt(taskIdStr.substring(firstIndex + 1));
+
+                return new TaskId(topicGroupId, partition);
+            } else {
+                final String namedTopology = taskIdStr.substring(0, 
firstIndex);
+                final int topicGroupId = 
Integer.parseInt(taskIdStr.substring(firstIndex + 1, secondIndex));
+                final int partition = 
Integer.parseInt(taskIdStr.substring(secondIndex + 1));
+
+                return new TaskId(topicGroupId, partition, namedTopology);
+            }
+        } catch (final Exception e) {
+            throw new TaskIdFormatException(taskIdStr);
+        }
+    }
+
+    /**
      * @throws IOException if cannot write to output stream
      * @deprecated since 3.0, for internal use, will be removed
      */
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/TaskMetadata.java 
b/streams/src/main/java/org/apache/kafka/streams/processor/TaskMetadata.java
index a35b728..f5a5a69 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TaskMetadata.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TaskMetadata.java
@@ -40,6 +40,19 @@ public class TaskMetadata {
 
     private final Optional<Long> timeCurrentIdlingStarted;
 
+    /**
+     * @deprecated since 3.0, not intended for public use
+     */
+    @Deprecated
+    public TaskMetadata(final String taskId,
+                        final Set<TopicPartition> topicPartitions,
+                        final Map<TopicPartition, Long> committedOffsets,
+                        final Map<TopicPartition, Long> endOffsets,
+                        final Optional<Long> timeCurrentIdlingStarted) {
+        this(TaskId.parse(taskId), topicPartitions, committedOffsets, 
endOffsets, timeCurrentIdlingStarted);
+    }
+
+    // For internal use -- not a public API
     public TaskMetadata(final TaskId taskId,
                         final Set<TopicPartition> topicPartitions,
                         final Map<TopicPartition, Long> committedOffsets,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
index b17169c..fb1a486 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
@@ -149,6 +149,18 @@ public class StateDirectoryTest {
     }
 
     @Test
+    public void shouldParseUnnamedTaskId() {
+        final TaskId task = new TaskId(1, 0);
+        assertThat(TaskId.parse(task.toString()), equalTo(task));
+    }
+
+    @Test
+    public void shouldParseNamedTaskId() {
+        final TaskId task = new TaskId(1, 0, "namedTopology");
+        assertThat(TaskId.parse(task.toString()), equalTo(task));
+    }
+
+    @Test
     public void shouldCreateTaskStateDirectory() {
         final TaskId taskId = new TaskId(0, 0);
         final File taskDirectory = 
directory.getOrCreateDirectoryForTask(taskId);

Reply via email to