This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new bbe170a MINOR: deprecate TaskMetadata constructor and add KIP-740
notes to upgrade guide (#10755)
bbe170a is described below
commit bbe170af701609180387ad4abbfaa2712936266d
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Wed May 26 10:35:12 2021 -0700
MINOR: deprecate TaskMetadata constructor and add KIP-740 notes to upgrade
guide (#10755)
Quick followup to KIP-740 to actually deprecate this constructor, and
update the upgrade guide with what we changed in KIP-740. I also noticed the
TaskId#parse method had been modified previously, and should be re-added to the
public TaskId class. It had no tests, so now it does
Reviewers: Matthias J. Sax <[email protected]>, Luke Chen
<[email protected]>
---
docs/streams/upgrade-guide.html | 8 ++++++
.../org/apache/kafka/streams/processor/TaskId.java | 32 ++++++++++++++++++++++
.../kafka/streams/processor/TaskMetadata.java | 13 +++++++++
.../processor/internals/StateDirectoryTest.java | 12 ++++++++
4 files changed, 65 insertions(+)
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index 3ae4573..892d025 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -117,6 +117,14 @@
<p>
We removed the default implementation of
<code>RocksDBConfigSetter#close()</code>.
</p>
+
+ <p>
+ The public <code>topicGroupId</code> and <code>partition</code> fields
on TaskId have been deprecated and replaced with getters. Please migrate to
using the new <code>TaskId.subtopology()</code>
+ (which replaces <code>topicGroupId</code>) and
<code>TaskId.partition()</code> APIs instead. Also, the
<code>TaskId#readFrom</code> and <code>TaskId#writeTo</code> methods have been
deprecated
+ and will be removed, as they were never intended for public use.
Finally, we have deprecated the <code>TaskMetadata.taskId()</code> method as
well as the <code>TaskMetadata</code> constructor.
+ These have been replaced with APIs that better represent the task id
as an actual <code>TaskId</code> object instead of a String. Please migrate to
the new <code>TaskMetadata#getTaskId</code>
+ method. See <a
href="https://cwiki.apache.org/confluence/x/vYTOCg">KIP-740</a> for more
details.
+ </p>
<p>
We removed the following deprecated APIs:
</p>
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
index d536302..f4d8349 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
@@ -16,11 +16,14 @@
*/
package org.apache.kafka.streams.processor;
+import org.apache.kafka.streams.errors.TaskIdFormatException;
+
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -81,6 +84,35 @@ public class TaskId implements Comparable<TaskId> {
}
/**
+ * @throws TaskIdFormatException if the taskIdStr is not a valid {@link
TaskId}
+ */
+ public static TaskId parse(final String taskIdStr) {
+ final int firstIndex = taskIdStr.indexOf('_');
+ final int secondIndex = taskIdStr.indexOf('_', firstIndex + 1);
+ if (firstIndex <= 0 || firstIndex + 1 >= taskIdStr.length()) {
+ throw new TaskIdFormatException(taskIdStr);
+ }
+
+ try {
+ // If only one copy of '_' exists, there is no named topology in
the string
+ if (secondIndex < 0) {
+ final int topicGroupId =
Integer.parseInt(taskIdStr.substring(0, firstIndex));
+ final int partition =
Integer.parseInt(taskIdStr.substring(firstIndex + 1));
+
+ return new TaskId(topicGroupId, partition);
+ } else {
+ final String namedTopology = taskIdStr.substring(0,
firstIndex);
+ final int topicGroupId =
Integer.parseInt(taskIdStr.substring(firstIndex + 1, secondIndex));
+ final int partition =
Integer.parseInt(taskIdStr.substring(secondIndex + 1));
+
+ return new TaskId(topicGroupId, partition, namedTopology);
+ }
+ } catch (final Exception e) {
+ throw new TaskIdFormatException(taskIdStr);
+ }
+ }
+
+ /**
* @throws IOException if cannot write to output stream
* @deprecated since 3.0, for internal use, will be removed
*/
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/TaskMetadata.java
b/streams/src/main/java/org/apache/kafka/streams/processor/TaskMetadata.java
index a35b728..f5a5a69 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TaskMetadata.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TaskMetadata.java
@@ -40,6 +40,19 @@ public class TaskMetadata {
private final Optional<Long> timeCurrentIdlingStarted;
+ /**
+ * @deprecated since 3.0, not intended for public use
+ */
+ @Deprecated
+ public TaskMetadata(final String taskId,
+ final Set<TopicPartition> topicPartitions,
+ final Map<TopicPartition, Long> committedOffsets,
+ final Map<TopicPartition, Long> endOffsets,
+ final Optional<Long> timeCurrentIdlingStarted) {
+ this(TaskId.parse(taskId), topicPartitions, committedOffsets,
endOffsets, timeCurrentIdlingStarted);
+ }
+
+ // For internal use -- not a public API
public TaskMetadata(final TaskId taskId,
final Set<TopicPartition> topicPartitions,
final Map<TopicPartition, Long> committedOffsets,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
index b17169c..fb1a486 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
@@ -149,6 +149,18 @@ public class StateDirectoryTest {
}
@Test
+ public void shouldParseUnnamedTaskId() {
+ final TaskId task = new TaskId(1, 0);
+ assertThat(TaskId.parse(task.toString()), equalTo(task));
+ }
+
+ @Test
+ public void shouldParseNamedTaskId() {
+ final TaskId task = new TaskId(1, 0, "namedTopology");
+ assertThat(TaskId.parse(task.toString()), equalTo(task));
+ }
+
+ @Test
public void shouldCreateTaskStateDirectory() {
final TaskId taskId = new TaskId(0, 0);
final File taskDirectory =
directory.getOrCreateDirectoryForTask(taskId);