This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 970d8930ac7 KAFKA-18159 Remove onPartitionsRevoked and
onPartitionsAssigned from SinkTask (#18049)
970d8930ac7 is described below
commit 970d8930ac7b0cff8d042d96f8bedb2a0b55f9a9
Author: Nick Guo <[email protected]>
AuthorDate: Fri Dec 6 04:28:34 2024 +0800
KAFKA-18159 Remove onPartitionsRevoked and onPartitionsAssigned from
SinkTask (#18049)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../java/org/apache/kafka/connect/sink/SinkTask.java | 16 ----------------
.../kafka/connect/integration/BlockingConnectorTest.java | 16 ----------------
docs/upgrade.html | 3 +++
3 files changed, 3 insertions(+), 32 deletions(-)
diff --git
a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java
b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java
index f4e25979f90..655c89ac670 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java
@@ -152,14 +152,6 @@ public abstract class SinkTask implements Task {
* partitions previously assigned to the task)
*/
public void open(Collection<TopicPartition> partitions) {
- this.onPartitionsAssigned(partitions);
- }
-
- /**
- * @deprecated Use {@link #open(Collection)} for partition initialization.
- */
- @Deprecated
- public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
}
/**
@@ -175,14 +167,6 @@ public abstract class SinkTask implements Task {
* @param partitions The list of partitions that should be closed
*/
public void close(Collection<TopicPartition> partitions) {
- this.onPartitionsRevoked(partitions);
- }
-
- /**
- * @deprecated Use {@link #close(Collection)} instead for partition
cleanup.
- */
- @Deprecated
- public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
}
/**
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
index 90a0e96a78a..948dcfaf159 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
@@ -109,9 +109,7 @@ public class BlockingConnectorTest {
private static final String SINK_TASK_FLUSH = "SinkTask::flush";
private static final String SINK_TASK_PRE_COMMIT = "SinkTask::preCommit";
private static final String SINK_TASK_OPEN = "SinkTask::open";
- private static final String SINK_TASK_ON_PARTITIONS_ASSIGNED =
"SinkTask::onPartitionsAssigned";
private static final String SINK_TASK_CLOSE = "SinkTask::close";
- private static final String SINK_TASK_ON_PARTITIONS_REVOKED =
"SinkTask::onPartitionsRevoked";
private static final String SOURCE_TASK_INITIALIZE =
"SourceTask::initialize";
private static final String SOURCE_TASK_POLL = "SourceTask::poll";
private static final String SOURCE_TASK_COMMIT = "SourceTask::commit";
@@ -869,25 +867,11 @@ public class BlockingConnectorTest {
super.open(partitions);
}
- @Override
- @SuppressWarnings("deprecation")
- public void onPartitionsAssigned(Collection<TopicPartition>
partitions) {
- block.maybeBlockOn(SINK_TASK_ON_PARTITIONS_ASSIGNED);
- super.onPartitionsAssigned(partitions);
- }
-
@Override
public void close(Collection<TopicPartition> partitions) {
block.maybeBlockOn(SINK_TASK_CLOSE);
super.close(partitions);
}
-
- @Override
- @SuppressWarnings("deprecation")
- public void onPartitionsRevoked(Collection<TopicPartition>
partitions) {
- block.maybeBlockOn(SINK_TASK_ON_PARTITIONS_REVOKED);
- super.onPartitionsRevoked(partitions);
- }
}
}
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 3c89af345d5..8e2aa2915f4 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -137,6 +137,9 @@
<li>The <code>whitelist</code> and
<code>blacklist</code> configurations were removed from the
<code>org.apache.kafka.connect.transforms.ReplaceField</code> transformation.
Please use <code>include</code> and
<code>exclude</code> respectively instead.
</li>
+ <li>The
<code>onPartitionsRevoked(Collection<TopicPartition>)</code> and
<code>onPartitionsAssigned(Collection<TopicPartition>)</code> methods
+ were removed from <code>SinkTask</code>.
+ </li>
</ul>
</li>
<li><b>Consumer</b>