Copilot commented on code in PR #17855:
URL: https://github.com/apache/pinot/pull/17855#discussion_r2921210093


##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/server/EmbeddedKafkaCluster.java:
##########
@@ -46,12 +46,19 @@ public class EmbeddedKafkaCluster implements 
StreamDataServerStartable {
   private static final int TOPIC_MUTATION_RETRIES = 5;
 
   private int _brokerCount = 1;
+  private final Properties _extraConfigProps = new Properties();
   private KafkaClusterTestKit _cluster;
   private String _bootstrapServers;
 
   @Override
   public void init(Properties props) {
     _brokerCount = Integer.parseInt(props.getProperty(BROKER_COUNT_PROP, "1"));
+    // Forward any additional properties (excluding our internal ones) as 
Kafka broker config
+    for (String key : props.stringPropertyNames()) {
+      if (!key.equals(BROKER_COUNT_PROP)) {
+        _extraConfigProps.setProperty(key, props.getProperty(key));
+      }
+    }

Review Comment:
   `init()` populates `_extraConfigProps` but never clears it. If the same 
`EmbeddedKafkaCluster` instance is reused across multiple init/start cycles, 
broker config keys removed from later `props` will still remain set from 
earlier calls, leading to stale/unexpected broker configuration. Clear 
`_extraConfigProps` at the start of `init()` (or reconstruct it) before copying 
properties.



##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CLPEncodingRealtimeIntegrationTest.java:
##########
@@ -103,6 +104,18 @@ public void testValues()
         .getLong(0), 53);
   }
 
+  @AfterClass
+  public void tearDown()
+      throws Exception {
+    dropRealtimeTable(getTableName());
+    stopServer();
+    stopBroker();
+    stopController();
+    stopKafka();
+    stopZk();
+    FileUtils.deleteDirectory(_tempDir);

Review Comment:
   The added `@AfterClass tearDown()` runs cleanup sequentially without 
`try/finally`. If `dropRealtimeTable()` or one of the stop calls throws, 
Kafka/ZK may remain running and `_tempDir` won’t be deleted, which can break 
later integration tests. Consider wrapping teardown in `try { ... } finally { 
... }` and doing `deleteQuietly(_tempDir)` in the `finally` block.
   ```suggestion
       try {
         dropRealtimeTable(getTableName());
         stopServer();
         stopBroker();
         stopController();
         stopKafka();
         stopZk();
       } finally {
         FileUtils.deleteQuietly(_tempDir);
       }
   ```



##########
pinot-connectors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/sink/PinotSinkUpsertTableIntegrationTest.java:
##########
@@ -223,6 +224,19 @@ private TableConfig setupTable(String tableName, String 
kafkaTopicName, String i
     return tableConfig;
   }
 
+  @AfterClass
+  public void tearDown()
+      throws Exception {
+    dropRealtimeTable(getTableName());
+    stopMinion();
+    stopServer();
+    stopBroker();
+    stopController();
+    stopKafka();
+    stopZk();
+    FileUtils.deleteDirectory(_tempDir);

Review Comment:
   The new `@AfterClass tearDown()` doesn’t use a `try/finally` guard. If 
dropping the table or stopping components fails, Kafka/ZK/minion may be left 
running and the temp directory won’t be cleaned up, which can cause 
port/resource leaks across the test suite. Use a `try/finally` teardown pattern 
and delete `_tempDir` in the `finally` (prefer `deleteQuietly`).
   ```suggestion
       try {
         dropRealtimeTable(getTableName());
         stopMinion();
         stopServer();
         stopBroker();
         stopController();
         stopKafka();
         stopZk();
       } finally {
         FileUtils.deleteQuietly(_tempDir);
       }
   ```



##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BrokerQueryLimitTest.java:
##########
@@ -170,13 +170,13 @@ public void tearDown()
     dropOfflineTable(getTableName());
     FileUtils.deleteDirectory(_tempDir);
 
-    // Stop Kafka
-    LOGGER.warn("Stop Kafka in the integration test class");
-    stopKafka();
     // Shutdown the Pinot cluster
     stopServer();
     stopBroker();

Review Comment:
   This teardown deletes `_tempDir` before stopping server/broker/controller, 
and then deletes it again at the end. Deleting the temp directory while 
components are still shutting down can make shutdown less reliable, and the 
second delete is redundant. Consider stopping all components first and doing a 
single temp-dir cleanup in a `finally` block (prefer `deleteQuietly`).



##########
pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java:
##########
@@ -189,6 +189,13 @@ protected int getNumKafkaBrokers() {
     return useKafkaTransaction() ? DEFAULT_TRANSACTION_NUM_KAFKA_BROKERS : 
DEFAULT_LLC_NUM_KAFKA_BROKERS;
   }
 
+  /**
+   * Override to pass extra Kafka broker config properties when starting the 
embedded Kafka cluster.
+   */
+  protected Properties getKafkaExtraProperties() {
+    return new Properties();
+  }

Review Comment:
   This PR introduces a new extension point (`getKafkaExtraProperties()`) and 
wires it through to the embedded Kafka broker config. That’s a functional 
change beyond startup/teardown ordering; please update the PR description to 
mention this addition (or split it into a separate PR) so reviewers understand 
the motivation/scope (e.g., required for ExactlyOnce test stability).



##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RetentionManagerIntegrationTest.java:
##########
@@ -95,6 +97,18 @@ protected void setupTable()
     waitForDocsLoaded(600_000L, true, tableConfig.getTableName());
   }
 
+  @AfterClass
+  public void tearDown()
+      throws Exception {
+    dropRealtimeTable(getTableName());
+    stopServer();
+    stopBroker();
+    stopController();
+    stopKafka();
+    stopZk();
+    FileUtils.deleteDirectory(_tempDir);

Review Comment:
   The new `@AfterClass tearDown()` does not use `try/finally`. If 
`dropRealtimeTable()` (or any stop method) throws, later shutdown steps 
(notably `stopKafka()/stopZk()`) won’t run, which can leak processes/ports and 
make subsequent integration tests flaky. Wrap teardown in `try { ... } finally 
{ ... }` and prefer `FileUtils.deleteQuietly(_tempDir)` in the `finally` block.
   ```suggestion
       try {
         dropRealtimeTable(getTableName());
         stopServer();
         stopBroker();
         stopController();
         stopKafka();
         stopZk();
       } finally {
         FileUtils.deleteQuietly(_tempDir);
       }
   ```



##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryWorkloadIntegrationTest.java:
##########
@@ -115,6 +117,19 @@ public void setUp()
     waitForAllDocsLoaded(100_000L);
   }
 
+  @AfterClass
+  public void tearDown()
+      throws Exception {
+    dropOfflineTable(getTableName());
+    dropRealtimeTable(getTableName());
+    stopServer();
+    stopBroker();
+    stopController();
+    stopKafka();
+    stopZk();
+    FileUtils.deleteDirectory(_tempDir);

Review Comment:
   The new `@AfterClass tearDown()` does not guard cleanup with `try/finally`. 
If table drops or any `stop*()` call fails, Kafka/ZK can be left running and 
`_tempDir` may not be cleaned, causing follow-on integration tests to fail. Use 
a `try/finally` teardown pattern and delete `_tempDir` in the `finally` 
(ideally `deleteQuietly`).
   ```suggestion
       try {
         dropOfflineTable(getTableName());
         dropRealtimeTable(getTableName());
         stopServer();
         stopBroker();
         stopController();
         stopKafka();
         stopZk();
       } finally {
         FileUtils.deleteQuietly(_tempDir);
       }
   ```



##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExactlyOnceKafkaRealtimeClusterIntegrationTest.java:
##########
@@ -165,9 +172,9 @@ private void waitForCommittedRecordsVisible(String 
brokerList) {
 
     // Final diagnostic dump
     lastUncommitted = countRecords(brokerList, "read_uncommitted");
-    System.err.println("[ExactlyOnce] VERIFICATION FAILED after 60s: 
read_committed=" + lastCommitted
+    System.err.println("[ExactlyOnce] VERIFICATION FAILED after 120s: 
read_committed=" + lastCommitted
         + ", read_uncommitted=" + lastUncommitted);
-    throw new AssertionError("[ExactlyOnce] Transaction markers were not 
propagated within 60s; "
+    throw new AssertionError("[ExactlyOnce] Transaction markers were not 
propagated within 120s; "

Review Comment:
   This test prints diagnostics via `System.err` in a failure path. Project 
guidelines prefer SLF4J logging over direct stdout/stderr; using a logger at 
`ERROR` level (or TestNG’s reporting facilities) will also integrate better 
with CI log collection/formatting. Consider replacing `System.err.println` with 
structured logging and including the relevant counters as fields.



##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BrokerQueryLimitTest.java:
##########
@@ -170,13 +170,13 @@ public void tearDown()
     dropOfflineTable(getTableName());
     FileUtils.deleteDirectory(_tempDir);
 
-    // Stop Kafka
-    LOGGER.warn("Stop Kafka in the integration test class");
-    stopKafka();
     // Shutdown the Pinot cluster
     stopServer();
     stopBroker();
     stopController();
+    // Stop Kafka
+    LOGGER.warn("Stop Kafka in the integration test class");
+    stopKafka();
     stopZk();
     FileUtils.deleteDirectory(_tempDir);
     LOGGER.warn("Finished tearing down integration test class: {}", 
getClass().getSimpleName());

Review Comment:
   `FileUtils.deleteDirectory(_tempDir)` is called twice in this method. Even 
if the second call is typically a no-op when the directory no longer exists, 
it’s redundant and makes the teardown harder to reason about. Consider removing 
the duplicate and consolidating cleanup into a single location (ideally in 
`finally`).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to