Copilot commented on code in PR #17855:
URL: https://github.com/apache/pinot/pull/17855#discussion_r2921210093
##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/server/EmbeddedKafkaCluster.java:
##########
@@ -46,12 +46,19 @@ public class EmbeddedKafkaCluster implements
StreamDataServerStartable {
private static final int TOPIC_MUTATION_RETRIES = 5;
private int _brokerCount = 1;
+ private final Properties _extraConfigProps = new Properties();
private KafkaClusterTestKit _cluster;
private String _bootstrapServers;
@Override
public void init(Properties props) {
_brokerCount = Integer.parseInt(props.getProperty(BROKER_COUNT_PROP, "1"));
+ // Forward any additional properties (excluding our internal ones) as
Kafka broker config
+ for (String key : props.stringPropertyNames()) {
+ if (!key.equals(BROKER_COUNT_PROP)) {
+ _extraConfigProps.setProperty(key, props.getProperty(key));
+ }
+ }
Review Comment:
`init()` populates `_extraConfigProps` but never clears it. If the same
`EmbeddedKafkaCluster` instance is reused across multiple init/start cycles,
broker config keys removed from later `props` will still remain set from
earlier calls, leading to stale/unexpected broker configuration. Clear
`_extraConfigProps` at the start of `init()` (or reconstruct it) before copying
properties.
##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CLPEncodingRealtimeIntegrationTest.java:
##########
@@ -103,6 +104,18 @@ public void testValues()
.getLong(0), 53);
}
+ @AfterClass
+ public void tearDown()
+ throws Exception {
+ dropRealtimeTable(getTableName());
+ stopServer();
+ stopBroker();
+ stopController();
+ stopKafka();
+ stopZk();
+ FileUtils.deleteDirectory(_tempDir);
Review Comment:
The added `@AfterClass tearDown()` runs cleanup sequentially without
`try/finally`. If `dropRealtimeTable()` or one of the stop calls throws,
Kafka/ZK may remain running and `_tempDir` won’t be deleted, which can break
later integration tests. Consider wrapping teardown in `try { ... } finally {
... }` and doing `deleteQuietly(_tempDir)` in the `finally` block.
```suggestion
try {
dropRealtimeTable(getTableName());
stopServer();
stopBroker();
stopController();
stopKafka();
stopZk();
} finally {
FileUtils.deleteQuietly(_tempDir);
}
```
##########
pinot-connectors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/sink/PinotSinkUpsertTableIntegrationTest.java:
##########
@@ -223,6 +224,19 @@ private TableConfig setupTable(String tableName, String
kafkaTopicName, String i
return tableConfig;
}
+ @AfterClass
+ public void tearDown()
+ throws Exception {
+ dropRealtimeTable(getTableName());
+ stopMinion();
+ stopServer();
+ stopBroker();
+ stopController();
+ stopKafka();
+ stopZk();
+ FileUtils.deleteDirectory(_tempDir);
Review Comment:
The new `@AfterClass tearDown()` doesn’t use a `try/finally` guard. If
dropping the table or stopping components fails, Kafka/ZK/minion may be left
running and the temp directory won’t be cleaned up, which can cause
port/resource leaks across the test suite. Use a `try/finally` teardown pattern
and delete `_tempDir` in the `finally` (prefer `deleteQuietly`).
```suggestion
try {
dropRealtimeTable(getTableName());
stopMinion();
stopServer();
stopBroker();
stopController();
stopKafka();
stopZk();
} finally {
FileUtils.deleteQuietly(_tempDir);
}
```
##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BrokerQueryLimitTest.java:
##########
@@ -170,13 +170,13 @@ public void tearDown()
dropOfflineTable(getTableName());
FileUtils.deleteDirectory(_tempDir);
- // Stop Kafka
- LOGGER.warn("Stop Kafka in the integration test class");
- stopKafka();
// Shutdown the Pinot cluster
stopServer();
stopBroker();
Review Comment:
This teardown deletes `_tempDir` before stopping server/broker/controller,
and then deletes it again at the end. Deleting the temp directory while
components are still shutting down can make shutdown less reliable, and the
second delete is redundant. Consider stopping all components first and doing a
single temp-dir cleanup in a `finally` block (prefer `deleteQuietly`).
##########
pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java:
##########
@@ -189,6 +189,13 @@ protected int getNumKafkaBrokers() {
return useKafkaTransaction() ? DEFAULT_TRANSACTION_NUM_KAFKA_BROKERS :
DEFAULT_LLC_NUM_KAFKA_BROKERS;
}
+ /**
+ * Override to pass extra Kafka broker config properties when starting the
embedded Kafka cluster.
+ */
+ protected Properties getKafkaExtraProperties() {
+ return new Properties();
+ }
Review Comment:
This PR introduces a new extension point (`getKafkaExtraProperties()`) and
wires it through to the embedded Kafka broker config. That’s a functional
change beyond startup/teardown ordering; please update the PR description to
mention this addition (or split it into a separate PR) so reviewers understand
the motivation/scope (e.g., required for ExactlyOnce test stability).
##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RetentionManagerIntegrationTest.java:
##########
@@ -95,6 +97,18 @@ protected void setupTable()
waitForDocsLoaded(600_000L, true, tableConfig.getTableName());
}
+ @AfterClass
+ public void tearDown()
+ throws Exception {
+ dropRealtimeTable(getTableName());
+ stopServer();
+ stopBroker();
+ stopController();
+ stopKafka();
+ stopZk();
+ FileUtils.deleteDirectory(_tempDir);
Review Comment:
The new `@AfterClass tearDown()` does not use `try/finally`. If
`dropRealtimeTable()` (or any stop method) throws, later shutdown steps
(notably `stopKafka()/stopZk()`) won’t run, which can leak processes/ports and
make subsequent integration tests flaky. Wrap teardown in `try { ... } finally
{ ... }` and prefer `FileUtils.deleteQuietly(_tempDir)` in the `finally` block.
```suggestion
try {
dropRealtimeTable(getTableName());
stopServer();
stopBroker();
stopController();
stopKafka();
stopZk();
} finally {
FileUtils.deleteQuietly(_tempDir);
}
```
##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryWorkloadIntegrationTest.java:
##########
@@ -115,6 +117,19 @@ public void setUp()
waitForAllDocsLoaded(100_000L);
}
+ @AfterClass
+ public void tearDown()
+ throws Exception {
+ dropOfflineTable(getTableName());
+ dropRealtimeTable(getTableName());
+ stopServer();
+ stopBroker();
+ stopController();
+ stopKafka();
+ stopZk();
+ FileUtils.deleteDirectory(_tempDir);
Review Comment:
The new `@AfterClass tearDown()` does not guard cleanup with `try/finally`.
If table drops or any `stop*()` call fails, Kafka/ZK can be left running and
`_tempDir` may not be cleaned, causing follow-on integration tests to fail. Use
a `try/finally` teardown pattern and delete `_tempDir` in the `finally`
(ideally `deleteQuietly`).
```suggestion
try {
dropOfflineTable(getTableName());
dropRealtimeTable(getTableName());
stopServer();
stopBroker();
stopController();
stopKafka();
stopZk();
} finally {
FileUtils.deleteQuietly(_tempDir);
}
```
##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExactlyOnceKafkaRealtimeClusterIntegrationTest.java:
##########
@@ -165,9 +172,9 @@ private void waitForCommittedRecordsVisible(String
brokerList) {
// Final diagnostic dump
lastUncommitted = countRecords(brokerList, "read_uncommitted");
- System.err.println("[ExactlyOnce] VERIFICATION FAILED after 60s:
read_committed=" + lastCommitted
+ System.err.println("[ExactlyOnce] VERIFICATION FAILED after 120s:
read_committed=" + lastCommitted
+ ", read_uncommitted=" + lastUncommitted);
- throw new AssertionError("[ExactlyOnce] Transaction markers were not
propagated within 60s; "
+ throw new AssertionError("[ExactlyOnce] Transaction markers were not
propagated within 120s; "
Review Comment:
This test prints diagnostics via `System.err` in a failure path. Project
guidelines prefer SLF4J logging over direct stdout/stderr; using a logger at
`ERROR` level (or TestNG’s reporting facilities) will also integrate better
with CI log collection/formatting. Consider replacing `System.err.println` with
structured logging and including the relevant counters as fields.
##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BrokerQueryLimitTest.java:
##########
@@ -170,13 +170,13 @@ public void tearDown()
dropOfflineTable(getTableName());
FileUtils.deleteDirectory(_tempDir);
- // Stop Kafka
- LOGGER.warn("Stop Kafka in the integration test class");
- stopKafka();
// Shutdown the Pinot cluster
stopServer();
stopBroker();
stopController();
+ // Stop Kafka
+ LOGGER.warn("Stop Kafka in the integration test class");
+ stopKafka();
stopZk();
FileUtils.deleteDirectory(_tempDir);
LOGGER.warn("Finished tearing down integration test class: {}",
getClass().getSimpleName());
Review Comment:
`FileUtils.deleteDirectory(_tempDir)` is called twice in this method. Even
if the second call is typically a no-op when the directory no longer exists,
it’s redundant and makes the teardown harder to reason about. Consider removing
the duplicate and consolidating cleanup into a single location (ideally in
`finally`).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]