viktorsomogyi commented on code in PR #12290:
URL: https://github.com/apache/kafka/pull/12290#discussion_r1234081443
##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java:
##########
@@ -350,13 +353,16 @@ private void assertRequestTimesOut(String
requestDescription, ThrowingRunnable r
}
private static class Block {
- private static CountDownLatch blockLatch;
+ // All latches that blocking connectors/tasks are or will be waiting
on during a test case
+ private static final Set<CountDownLatch> BLOCK_LATCHES = new
HashSet<>();
+ // The latch that can be used to wait for a connector/task to reach
the most-recently-registered blocking point
+ private static CountDownLatch awaitBlockLatch;
private final String block;
public static final String BLOCK_CONFIG = "block";
- private static ConfigDef config() {
+ public static ConfigDef config() {
Review Comment:
nit: The default (no modifier) should be sufficient. Any particular reasons
for making this public?
##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java:
##########
@@ -368,31 +374,54 @@ private static ConfigDef config() {
);
}
+ /**
+ * {@link CountDownLatch#await() Wait} for the connector/task to reach
the point in its lifecycle where
+ * it will block.
+ */
public static void waitForBlock() throws InterruptedException,
TimeoutException {
+ CountDownLatch awaitBlockLatch;
synchronized (Block.class) {
- if (blockLatch == null) {
- throw new IllegalArgumentException("No connector has been
created yet");
- }
+ awaitBlockLatch = Block.awaitBlockLatch;
+ }
+
+ if (awaitBlockLatch == null) {
+ throw new IllegalArgumentException("No connector has been
created yet");
}
log.debug("Waiting for connector to block");
- if (!blockLatch.await(CONNECTOR_BLOCK_TIMEOUT_MS,
TimeUnit.MILLISECONDS)) {
+ if (!awaitBlockLatch.await(CONNECTOR_BLOCK_TIMEOUT_MS,
TimeUnit.MILLISECONDS)) {
throw new TimeoutException("Timed out waiting for connector to
block.");
}
log.debug("Connector should now be blocked");
}
- // Note that there is only ever at most one global block latch at a
time, which makes tests that
+ /**
+ * {@link CountDownLatch#countDown() Release} any latches allocated
over the course of a test
+ * to either await a connector/task reaching a blocking point, or
cause a connector/task to block.
+ */
+ public static synchronized void reset() {
Review Comment:
nit: no need for `public`
##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java:
##########
@@ -401,23 +430,29 @@ public Block(Map<String, String> props) {
public Block(String block) {
this.block = block;
- synchronized (Block.class) {
- if (blockLatch != null) {
- blockLatch.countDown();
+ if (block != null) {
+ synchronized (Block.class) {
+ resetAwaitBlockLatch();
+ awaitBlockLatch = new CountDownLatch(1);
}
- blockLatch = new CountDownLatch(1);
}
}
public void maybeBlockOn(String block) {
if (block.equals(this.block)) {
log.info("Will block on {}", block);
- blockLatch.countDown();
+ CountDownLatch blockLatch;
+ synchronized (Block.class) {
+ awaitBlockLatch.countDown();
+ blockLatch = newBlockLatch();
+ }
while (true) {
try {
- Thread.sleep(Long.MAX_VALUE);
+ blockLatch.await();
+ log.debug("Instructed to stop blocking; will resume
normal execution");
+ return;
} catch (InterruptedException e) {
- // No-op. Just keep blocking.
+ log.debug("Interrupted while blocking; will continue
blocking until instructed to stop");
}
}
Review Comment:
Wouldn't this while block prevent the normal shutdown of connect based on
the order in `BlockingConnectorTest` (you call `connect.stop` and then
`Block.reset`)? For instance the way Worker is shutting down they expect
WorkerConnectors to respond to an interrupt.
Code reference:
https://github.com/apache/kafka/blob/6f7682d2f4ecc8110f80cb6301de02f512d36a53/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L267
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]