This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 19d8a414ef6 KAFKA-15900, KAFKA-18310: fix flaky test
testOutdatedCoordinatorAssignment and AbstractCoordinatorTest (#18945)
19d8a414ef6 is described below
commit 19d8a414ef6d86596d5f8d33828d8d7560cc8678
Author: PoAn Yang <[email protected]>
AuthorDate: Mon Mar 10 23:50:35 2025 +0800
KAFKA-15900, KAFKA-18310: fix flaky test testOutdatedCoordinatorAssignment
and AbstractCoordinatorTest (#18945)
Reviewers: Lianet Magrans <[email protected]>
---
.../consumer/internals/AbstractCoordinator.java | 74 ++++++++++------------
.../consumer/internals/BaseHeartbeatThread.java | 69 ++++++++++++++++++++
.../consumer/internals/ConsumerCoordinator.java | 41 +++++++++++-
.../internals/AbstractCoordinatorTest.java | 60 ++++++++----------
.../internals/BaseHeartbeatThreadTest.java | 65 +++++++++++++++++++
.../internals/ConsumerCoordinatorTest.java | 28 +++++++-
6 files changed, 260 insertions(+), 77 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index b530ca562b9..f8165f6656d 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -67,7 +67,6 @@ import org.apache.kafka.common.requests.SyncGroupResponse;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryProvider;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
import org.apache.kafka.common.utils.ExponentialBackoff;
-import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
@@ -84,7 +83,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
/**
* AbstractCoordinator implements group management for a single group member
by interacting with
@@ -135,6 +134,7 @@ public abstract class AbstractCoordinator implements
Closeable {
private final GroupCoordinatorMetrics sensors;
private final GroupRebalanceConfig rebalanceConfig;
private final Optional<ClientTelemetryReporter> clientTelemetryReporter;
+ private final Optional<Supplier<BaseHeartbeatThread>>
heartbeatThreadSupplier;
protected final Time time;
protected final ConsumerNetworkClient client;
@@ -144,7 +144,7 @@ public abstract class AbstractCoordinator implements
Closeable {
private String rejoinReason = "";
private boolean rejoinNeeded = true;
private boolean needsJoinPrepare = true;
- private HeartbeatThread heartbeatThread = null;
+ private BaseHeartbeatThread heartbeatThread = null;
private RequestFuture<ByteBuffer> joinFuture = null;
private RequestFuture<Void> findCoordinatorFuture = null;
private volatile RuntimeException fatalFindCoordinatorException = null;
@@ -165,7 +165,7 @@ public abstract class AbstractCoordinator implements
Closeable {
Metrics metrics,
String metricGrpPrefix,
Time time) {
- this(rebalanceConfig, logContext, client, metrics, metricGrpPrefix,
time, Optional.empty());
+ this(rebalanceConfig, logContext, client, metrics, metricGrpPrefix,
time, Optional.empty(), Optional.empty());
}
public AbstractCoordinator(GroupRebalanceConfig rebalanceConfig,
@@ -174,7 +174,8 @@ public abstract class AbstractCoordinator implements
Closeable {
Metrics metrics,
String metricGrpPrefix,
Time time,
- Optional<ClientTelemetryReporter>
clientTelemetryReporter) {
+ Optional<ClientTelemetryReporter>
clientTelemetryReporter,
+ Optional<Supplier<BaseHeartbeatThread>>
heartbeatThreadSupplier) {
Objects.requireNonNull(rebalanceConfig.groupId,
"Expected a non-null group id for coordinator
construction");
this.rebalanceConfig = rebalanceConfig;
@@ -189,6 +190,7 @@ public abstract class AbstractCoordinator implements
Closeable {
this.heartbeat = new Heartbeat(rebalanceConfig, time);
this.sensors = new GroupCoordinatorMetrics(metrics, metricGrpPrefix);
this.clientTelemetryReporter = clientTelemetryReporter;
+ this.heartbeatThreadSupplier = heartbeatThreadSupplier;
}
/**
@@ -361,7 +363,7 @@ public abstract class AbstractCoordinator implements
Closeable {
*/
protected synchronized void pollHeartbeat(long now) {
if (heartbeatThread != null) {
- if (heartbeatThread.hasFailed()) {
+ if (heartbeatThread.isFailed()) {
// set the heartbeat thread to null and raise an exception. If
the user catches it,
// the next call to ensureActiveGroup() will spawn a new
heartbeat thread.
RuntimeException cause = heartbeatThread.failureCause();
@@ -381,7 +383,7 @@ public abstract class AbstractCoordinator implements
Closeable {
// we don't need to send heartbeats
if (state.hasNotJoinedGroup())
return Long.MAX_VALUE;
- if (heartbeatThread != null && heartbeatThread.hasFailed()) {
+ if (heartbeatThread != null && heartbeatThread.isFailed()) {
// if an exception occurs in the heartbeat thread, raise it.
throw heartbeatThread.failureCause();
}
@@ -417,13 +419,13 @@ public abstract class AbstractCoordinator implements
Closeable {
private synchronized void startHeartbeatThreadIfNeeded() {
if (heartbeatThread == null) {
- heartbeatThread = new HeartbeatThread();
+ heartbeatThread =
heartbeatThreadSupplier.orElse(HeartbeatThread::new).get();
heartbeatThread.start();
}
}
private void closeHeartbeatThread() {
- HeartbeatThread thread;
+ BaseHeartbeatThread thread;
synchronized (this) {
if (heartbeatThread == null)
return;
@@ -1330,6 +1332,13 @@ public abstract class AbstractCoordinator implements
Closeable {
String.format("The total number of %s",
descriptiveName)));
}
+ /**
+ * Visible for testing.
+ */
+ protected BaseHeartbeatThread heartbeatThread() {
+ return heartbeatThread;
+ }
+
private class GroupCoordinatorMetrics {
public final String metricGrpName;
@@ -1436,56 +1445,40 @@ public abstract class AbstractCoordinator implements
Closeable {
}
}
- private class HeartbeatThread extends KafkaThread implements AutoCloseable
{
- private boolean enabled = false;
- private boolean closed = false;
- private final AtomicReference<RuntimeException> failed = new
AtomicReference<>(null);
+ private class HeartbeatThread extends BaseHeartbeatThread {
private HeartbeatThread() {
super(HEARTBEAT_THREAD_PREFIX + (rebalanceConfig.groupId.isEmpty()
? "" : " | " + rebalanceConfig.groupId), true);
}
+ @Override
public void enable() {
synchronized (AbstractCoordinator.this) {
log.debug("Enabling heartbeat thread");
- this.enabled = true;
+ super.enable();
heartbeat.resetTimeouts();
AbstractCoordinator.this.notify();
}
}
- public void disable() {
- synchronized (AbstractCoordinator.this) {
- log.debug("Disabling heartbeat thread");
- this.enabled = false;
- }
- }
-
+ @Override
public void close() {
synchronized (AbstractCoordinator.this) {
- this.closed = true;
+ super.close();
AbstractCoordinator.this.notify();
}
}
- private boolean hasFailed() {
- return failed.get() != null;
- }
-
- private RuntimeException failureCause() {
- return failed.get();
- }
-
@Override
public void run() {
try {
log.debug("Heartbeat thread started");
while (true) {
synchronized (AbstractCoordinator.this) {
- if (closed)
+ if (isClosed())
return;
- if (!enabled) {
+ if (!isEnabled()) {
AbstractCoordinator.this.wait();
continue;
}
@@ -1493,7 +1486,7 @@ public abstract class AbstractCoordinator implements
Closeable {
// we do not need to heartbeat we are not part of a
group yet;
// also if we already have fatal error, the client
will be
// crashed soon, hence we do not need to continue
heartbeating either
- if (state.hasNotJoinedGroup() || hasFailed()) {
+ if (state.hasNotJoinedGroup() || isFailed()) {
disable();
continue;
}
@@ -1547,7 +1540,7 @@ public abstract class AbstractCoordinator implements
Closeable {
heartbeat.receiveHeartbeat();
} else if (e instanceof
FencedInstanceIdException) {
log.error("Caught fenced
group.instance.id {} error in heartbeat thread",
rebalanceConfig.groupInstanceId);
- heartbeatThread.failed.set(e);
+ setFailureCause(e);
} else {
heartbeat.failHeartbeat();
// wake up the thread if it's
sleeping to reschedule the heartbeat
@@ -1561,28 +1554,27 @@ public abstract class AbstractCoordinator implements
Closeable {
}
} catch (AuthenticationException e) {
log.error("An authentication error occurred in the heartbeat
thread", e);
- this.failed.set(e);
+ setFailureCause(e);
} catch (GroupAuthorizationException e) {
log.error("A group authorization error occurred in the
heartbeat thread", e);
- this.failed.set(e);
+ setFailureCause(e);
} catch (InterruptedException | InterruptException e) {
Thread.interrupted();
log.error("Unexpected interrupt received in heartbeat thread",
e);
- this.failed.set(new RuntimeException(e));
+ setFailureCause(new RuntimeException(e));
} catch (Throwable e) {
log.error("Heartbeat thread failed due to unexpected error",
e);
if (e instanceof RuntimeException)
- this.failed.set((RuntimeException) e);
+ setFailureCause((RuntimeException) e);
else
- this.failed.set(new RuntimeException(e));
+ setFailureCause(new RuntimeException(e));
} finally {
log.debug("Heartbeat thread has closed");
synchronized (AbstractCoordinator.this) {
- this.closed = true;
+ super.close();
}
}
}
-
}
protected static class Generation {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/BaseHeartbeatThread.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/BaseHeartbeatThread.java
new file mode 100644
index 00000000000..c9d96d807e3
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/BaseHeartbeatThread.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.utils.KafkaThread;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Base class for heartbeat threads. This class provides a mechanism to
enable/disable the heartbeat thread.
+ * The heartbeat thread should check whether it's enabled by calling {@link
BaseHeartbeatThread#isEnabled()}
+ * before sending heartbeat requests.
+ */
+public class BaseHeartbeatThread extends KafkaThread implements AutoCloseable {
+ private final AtomicBoolean enabled = new AtomicBoolean(false);
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+ private final AtomicReference<RuntimeException> failureCause = new
AtomicReference<>(null);
+
+ public BaseHeartbeatThread(String name, boolean daemon) {
+ super(name, daemon);
+ }
+
+ public void enable() {
+ enabled.set(true);
+ }
+
+ public void disable() {
+ enabled.set(false);
+ }
+
+ public boolean isEnabled() {
+ return enabled.get();
+ }
+
+ public void setFailureCause(RuntimeException e) {
+ failureCause.set(e);
+ }
+
+ public boolean isFailed() {
+ return failureCause.get() != null;
+ }
+
+ public RuntimeException failureCause() {
+ return failureCause.get();
+ }
+
+ public void close() {
+ closed.set(true);
+ }
+
+ public boolean isClosed() {
+ return closed.get();
+ }
+}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 584a03736f9..784907936f4 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -86,6 +86,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import static
org.apache.kafka.clients.consumer.ConsumerConfig.ASSIGN_FROM_SUBSCRIBED_ASSIGNORS;
@@ -176,13 +177,51 @@ public final class ConsumerCoordinator extends
AbstractCoordinator {
boolean throwOnFetchStableOffsetsUnsupported,
String rackId,
Optional<ClientTelemetryReporter>
clientTelemetryReporter) {
+ this(rebalanceConfig,
+ logContext,
+ client,
+ assignors,
+ metadata,
+ subscriptions,
+ metrics,
+ metricGrpPrefix,
+ time,
+ autoCommitEnabled,
+ autoCommitIntervalMs,
+ interceptors,
+ throwOnFetchStableOffsetsUnsupported,
+ rackId,
+ clientTelemetryReporter,
+ Optional.empty());
+ }
+
+ /**
+ * Initialize the coordination manager.
+ */
+ public ConsumerCoordinator(GroupRebalanceConfig rebalanceConfig,
+ LogContext logContext,
+ ConsumerNetworkClient client,
+ List<ConsumerPartitionAssignor> assignors,
+ ConsumerMetadata metadata,
+ SubscriptionState subscriptions,
+ Metrics metrics,
+ String metricGrpPrefix,
+ Time time,
+ boolean autoCommitEnabled,
+ int autoCommitIntervalMs,
+ ConsumerInterceptors<?, ?> interceptors,
+ boolean throwOnFetchStableOffsetsUnsupported,
+ String rackId,
+ Optional<ClientTelemetryReporter>
clientTelemetryReporter,
+ Optional<Supplier<BaseHeartbeatThread>>
heartbeatThreadSupplier) {
super(rebalanceConfig,
logContext,
client,
metrics,
metricGrpPrefix,
time,
- clientTelemetryReporter);
+ clientTelemetryReporter,
+ heartbeatThreadSupplier);
this.rebalanceConfig = rebalanceConfig;
this.log = logContext.logger(ConsumerCoordinator.class);
this.metadata = metadata;
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index f144ccf5061..aedc4977a03 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -51,7 +51,6 @@ import org.apache.kafka.common.requests.LeaveGroupResponse;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.requests.SyncGroupRequest;
import org.apache.kafka.common.requests.SyncGroupResponse;
-import org.apache.kafka.common.test.api.Flaky;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
@@ -75,6 +74,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
import static java.util.Collections.emptyMap;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -86,6 +86,8 @@ import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
public class AbstractCoordinatorTest {
private static final ByteBuffer EMPTY_DATA = ByteBuffer.wrap(new byte[0]);
@@ -120,15 +122,15 @@ public class AbstractCoordinatorTest {
private void setupCoordinator() {
setupCoordinator(RETRY_BACKOFF_MS, RETRY_BACKOFF_MAX_MS,
REBALANCE_TIMEOUT_MS,
- Optional.empty());
+ Optional.empty(), Optional.empty());
}
private void setupCoordinator(int retryBackoffMs, int retryBackoffMaxMs) {
setupCoordinator(retryBackoffMs, retryBackoffMaxMs,
REBALANCE_TIMEOUT_MS,
- Optional.empty());
+ Optional.empty(), Optional.empty());
}
- private void setupCoordinator(int retryBackoffMs, int retryBackoffMaxMs,
int rebalanceTimeoutMs, Optional<String> groupInstanceId) {
+ private void setupCoordinator(int retryBackoffMs, int retryBackoffMaxMs,
int rebalanceTimeoutMs, Optional<String> groupInstanceId,
Optional<Supplier<BaseHeartbeatThread>> heartbeatThreadSupplier) {
LogContext logContext = new LogContext();
this.mockTime = new MockTime();
ConsumerMetadata metadata = new ConsumerMetadata(retryBackoffMs,
retryBackoffMaxMs, 60 * 60 * 1000L,
@@ -160,7 +162,8 @@ public class AbstractCoordinatorTest {
this.coordinator = new DummyCoordinator(rebalanceConfig,
consumerClient,
metrics,
- mockTime);
+ mockTime,
+ heartbeatThreadSupplier);
}
private void joinGroup() {
@@ -349,8 +352,7 @@ public class AbstractCoordinatorTest {
@Test
public void testJoinGroupRequestTimeout() {
- setupCoordinator(RETRY_BACKOFF_MS, RETRY_BACKOFF_MAX_MS,
REBALANCE_TIMEOUT_MS,
- Optional.empty());
+ setupCoordinator();
mockClient.prepareResponse(groupCoordinatorResponse(node,
Errors.NONE));
coordinator.ensureCoordinatorReady(mockTime.timer(0));
@@ -367,7 +369,7 @@ public class AbstractCoordinatorTest {
@Test
public void
testJoinGroupRequestTimeoutLowerBoundedByDefaultRequestTimeout() {
int rebalanceTimeoutMs = REQUEST_TIMEOUT_MS - 10000;
- setupCoordinator(RETRY_BACKOFF_MS, RETRY_BACKOFF_MAX_MS,
rebalanceTimeoutMs, Optional.empty());
+ setupCoordinator(RETRY_BACKOFF_MS, RETRY_BACKOFF_MAX_MS,
rebalanceTimeoutMs, Optional.empty(), Optional.empty());
mockClient.prepareResponse(groupCoordinatorResponse(node,
Errors.NONE));
coordinator.ensureCoordinatorReady(mockTime.timer(0));
@@ -387,7 +389,7 @@ public class AbstractCoordinatorTest {
// Ensure we can handle the maximum allowed rebalance timeout
setupCoordinator(RETRY_BACKOFF_MS, RETRY_BACKOFF_MAX_MS,
Integer.MAX_VALUE,
- Optional.empty());
+ Optional.empty(), Optional.empty());
mockClient.prepareResponse(groupCoordinatorResponse(node,
Errors.NONE));
coordinator.ensureCoordinatorReady(mockTime.timer(0));
@@ -1094,7 +1096,7 @@ public class AbstractCoordinatorTest {
}
private void checkLeaveGroupRequestSent(Optional<String> groupInstanceId) {
- setupCoordinator(RETRY_BACKOFF_MS, RETRY_BACKOFF_MAX_MS,
Integer.MAX_VALUE, groupInstanceId);
+ setupCoordinator(RETRY_BACKOFF_MS, RETRY_BACKOFF_MAX_MS,
Integer.MAX_VALUE, groupInstanceId, Optional.empty());
mockClient.prepareResponse(groupCoordinatorResponse(node,
Errors.NONE));
mockClient.prepareResponse(joinGroupFollowerResponse(1, memberId,
leaderId, Errors.NONE));
@@ -1189,7 +1191,7 @@ public class AbstractCoordinatorTest {
private RequestFuture<Void> setupLeaveGroup(LeaveGroupResponse
leaveGroupResponse,
String leaveReason,
String expectedLeaveReason) {
- setupCoordinator(RETRY_BACKOFF_MS, RETRY_BACKOFF_MAX_MS,
Integer.MAX_VALUE, Optional.empty());
+ setupCoordinator(RETRY_BACKOFF_MS, RETRY_BACKOFF_MAX_MS,
Integer.MAX_VALUE, Optional.empty(), Optional.empty());
mockClient.prepareResponse(groupCoordinatorResponse(node,
Errors.NONE));
mockClient.prepareResponse(joinGroupFollowerResponse(1, memberId,
leaderId, Errors.NONE));
@@ -1435,10 +1437,10 @@ public class AbstractCoordinatorTest {
awaitFirstHeartbeat(heartbeatReceived);
}
- @Flaky("KAFKA-18310")
@Test
public void testWakeupAfterSyncGroupSentExternalCompletion() throws
Exception {
- setupCoordinator();
+ setupCoordinator(RETRY_BACKOFF_MS, RETRY_BACKOFF_MAX_MS,
REBALANCE_TIMEOUT_MS,
+ Optional.empty(), Optional.of(() ->
mock(BaseHeartbeatThread.class)));
mockClient.prepareResponse(groupCoordinatorResponse(node,
Errors.NONE));
mockClient.prepareResponse(joinGroupFollowerResponse(1, memberId,
leaderId, Errors.NONE));
@@ -1454,13 +1456,13 @@ public class AbstractCoordinatorTest {
return isSyncGroupRequest;
}
}, syncGroupResponse(Errors.NONE));
- AtomicBoolean heartbeatReceived = prepareFirstHeartbeat();
assertThrows(WakeupException.class, () ->
coordinator.ensureActiveGroup(), "Should have woken up from
ensureActiveGroup()");
assertEquals(1, coordinator.onJoinPrepareInvokes);
assertEquals(0, coordinator.onJoinCompleteInvokes);
- assertFalse(heartbeatReceived.get());
+ assertNotNull(coordinator.heartbeatThread());
+ verify(coordinator.heartbeatThread()).enable();
// the join group completes in this poll()
consumerClient.poll(mockTime.timer(0));
@@ -1468,14 +1470,12 @@ public class AbstractCoordinatorTest {
assertEquals(1, coordinator.onJoinPrepareInvokes);
assertEquals(1, coordinator.onJoinCompleteInvokes);
-
- awaitFirstHeartbeat(heartbeatReceived);
}
- @Flaky("KAFKA-18310")
@Test
public void testWakeupAfterSyncGroupReceived() throws Exception {
- setupCoordinator();
+ setupCoordinator(RETRY_BACKOFF_MS, RETRY_BACKOFF_MAX_MS,
REBALANCE_TIMEOUT_MS,
+ Optional.empty(), Optional.of(() ->
mock(BaseHeartbeatThread.class)));
mockClient.prepareResponse(groupCoordinatorResponse(node,
Errors.NONE));
mockClient.prepareResponse(joinGroupFollowerResponse(1, memberId,
leaderId, Errors.NONE));
@@ -1486,7 +1486,6 @@ public class AbstractCoordinatorTest {
consumerClient.wakeup();
return isSyncGroupRequest;
}, syncGroupResponse(Errors.NONE));
- AtomicBoolean heartbeatReceived = prepareFirstHeartbeat();
try {
coordinator.ensureActiveGroup();
@@ -1496,20 +1495,19 @@ public class AbstractCoordinatorTest {
assertEquals(1, coordinator.onJoinPrepareInvokes);
assertEquals(0, coordinator.onJoinCompleteInvokes);
- assertFalse(heartbeatReceived.get());
+ assertNotNull(coordinator.heartbeatThread());
+ verify(coordinator.heartbeatThread()).enable();
coordinator.ensureActiveGroup();
assertEquals(1, coordinator.onJoinPrepareInvokes);
assertEquals(1, coordinator.onJoinCompleteInvokes);
-
- awaitFirstHeartbeat(heartbeatReceived);
}
- @Flaky("KAFKA-15474,KAFKA-18310")
@Test
public void testWakeupAfterSyncGroupReceivedExternalCompletion() throws
Exception {
- setupCoordinator();
+ setupCoordinator(RETRY_BACKOFF_MS, RETRY_BACKOFF_MAX_MS,
REBALANCE_TIMEOUT_MS,
+ Optional.empty(), Optional.of(() ->
mock(BaseHeartbeatThread.class)));
mockClient.prepareResponse(groupCoordinatorResponse(node,
Errors.NONE));
mockClient.prepareResponse(joinGroupFollowerResponse(1, memberId,
leaderId, Errors.NONE));
@@ -1520,20 +1518,18 @@ public class AbstractCoordinatorTest {
consumerClient.wakeup();
return isSyncGroupRequest;
}, syncGroupResponse(Errors.NONE));
- AtomicBoolean heartbeatReceived = prepareFirstHeartbeat();
assertThrows(WakeupException.class, () ->
coordinator.ensureActiveGroup(), "Should have woken up from
ensureActiveGroup()");
assertEquals(1, coordinator.onJoinPrepareInvokes);
assertEquals(0, coordinator.onJoinCompleteInvokes);
- assertFalse(heartbeatReceived.get());
+ assertNotNull(coordinator.heartbeatThread());
+ verify(coordinator.heartbeatThread()).enable();
coordinator.ensureActiveGroup();
assertEquals(1, coordinator.onJoinPrepareInvokes);
assertEquals(1, coordinator.onJoinCompleteInvokes);
-
- awaitFirstHeartbeat(heartbeatReceived);
}
@Test
@@ -1707,8 +1703,9 @@ public class AbstractCoordinatorTest {
DummyCoordinator(GroupRebalanceConfig rebalanceConfig,
ConsumerNetworkClient client,
Metrics metrics,
- Time time) {
- super(rebalanceConfig, new LogContext(), client, metrics,
METRIC_GROUP_PREFIX, time);
+ Time time,
+ Optional<Supplier<BaseHeartbeatThread>>
heartbeatThreadSupplier) {
+ super(rebalanceConfig, new LogContext(), client, metrics,
METRIC_GROUP_PREFIX, time, Optional.empty(), heartbeatThreadSupplier);
}
@Override
@@ -1750,5 +1747,4 @@ public class AbstractCoordinatorTest {
onJoinCompleteInvokes++;
}
}
-
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/BaseHeartbeatThreadTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/BaseHeartbeatThreadTest.java
new file mode 100644
index 00000000000..359a58ea261
--- /dev/null
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/BaseHeartbeatThreadTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.errors.FencedInstanceIdException;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class BaseHeartbeatThreadTest {
+
+ @Test
+ public void testIsEnabled() {
+ try (BaseHeartbeatThread baseHeartbeatThread = new
BaseHeartbeatThread("test", true)) {
+ assertFalse(baseHeartbeatThread.isEnabled());
+
+ baseHeartbeatThread.enable();
+ assertTrue(baseHeartbeatThread.isEnabled());
+
+ baseHeartbeatThread.disable();
+ assertFalse(baseHeartbeatThread.isEnabled());
+ }
+ }
+
+ @Test
+ public void testIsFailed() {
+ try (BaseHeartbeatThread baseHeartbeatThread = new
BaseHeartbeatThread("test", true)) {
+ assertFalse(baseHeartbeatThread.isFailed());
+ assertNull(baseHeartbeatThread.failureCause());
+
+ FencedInstanceIdException exception = new
FencedInstanceIdException("test");
+ baseHeartbeatThread.setFailureCause(exception);
+ assertTrue(baseHeartbeatThread.isFailed());
+ assertEquals(exception, baseHeartbeatThread.failureCause());
+ }
+ }
+
+ @Test
+ public void testIsClosed() {
+ try (BaseHeartbeatThread baseHeartbeatThread = new
BaseHeartbeatThread("test", true)) {
+ assertFalse(baseHeartbeatThread.isClosed());
+
+ baseHeartbeatThread.close();
+ assertTrue(baseHeartbeatThread.isClosed());
+ }
+ }
+}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 3dd91dc7639..c41ea0029ba 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -76,7 +76,6 @@ import
org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.requests.SyncGroupRequest;
import org.apache.kafka.common.requests.SyncGroupResponse;
-import org.apache.kafka.common.test.api.Flaky;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
@@ -1010,10 +1009,9 @@ public abstract class ConsumerCoordinatorTest {
assertEquals(getAdded(owned, assigned), rebalanceListener.assigned);
}
-
- @Flaky("KAFKA-15900")
@Test
public void testOutdatedCoordinatorAssignment() {
+ createMockHeartbeatThreadCoordinator();
final String consumerId = "outdated_assignment";
final List<TopicPartition> owned = Collections.emptyList();
final List<String> oldSubscription = singletonList(topic2);
@@ -4145,4 +4143,28 @@ public abstract class ConsumerCoordinatorTest {
return super.assign(partitionsPerTopic, subscriptions);
}
}
+
+ private void createMockHeartbeatThreadCoordinator() {
+ metrics.close();
+ coordinator.close(time.timer(0));
+
+ metrics = new Metrics(time);
+ coordinator = new ConsumerCoordinator(
+ rebalanceConfig,
+ new LogContext(),
+ consumerClient,
+ assignors,
+ metadata,
+ subscriptions,
+ metrics,
+ consumerId + groupId,
+ time,
+ false,
+ autoCommitIntervalMs,
+ null,
+ false,
+ null,
+ Optional.empty(),
+ Optional.of(() -> Mockito.mock(BaseHeartbeatThread.class)));
+ }
}