This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new f641a9c1a7a Fix EmbeddedKafkaCluster startup/teardown ordering in
integration tests (#17855)
f641a9c1a7a is described below
commit f641a9c1a7a20a23d69b1b79d607fc5ba85ef2a8
Author: Xiang Fu <[email protected]>
AuthorDate: Wed Mar 11 22:00:02 2026 -0700
Fix EmbeddedKafkaCluster startup/teardown ordering in integration tests
(#17855)
* Fix Kafka startup/teardown ordering in integration tests
- Add getKafkaExtraProperties() hook in BaseClusterIntegrationTest for
subclasses to pass custom Kafka broker config
- Update EmbeddedKafkaCluster to forward extra config properties to
KafkaClusterTestKit builder
- Set log.flush.interval.messages=1 in ExactlyOnceKafka test to ensure
transactional data is flushed to disk immediately
- Fix timeout message mismatch (was "60s", actual deadline is 120s)
- Add retry logic for realtime table creation when Kafka topic metadata
is not yet available
Co-Authored-By: Claude Opus 4.6 <[email protected]>
* Address review comments: try/finally teardown, SLF4J logging, clear extra
props
- Wrap all @AfterClass tearDown methods in try/finally with
FileUtils.deleteQuietly for reliable temp directory cleanup
- Fix BrokerQueryLimitTest duplicate deleteDirectory and wrong ordering
- Replace System.err.println with SLF4J LOGGER in ExactlyOnce test
- Clear _extraConfigProps at start of init() in EmbeddedKafkaCluster
Co-Authored-By: Claude Opus 4.6 <[email protected]>
---------
Co-authored-by: Claude Opus 4.6 <[email protected]>
---
.../sink/PinotSinkUpsertTableIntegrationTest.java | 23 ++++++++++--
.../tests/BaseClusterIntegrationTest.java | 8 +++++
.../tests/BasePauselessRealtimeIngestionTest.java | 2 +-
.../tests/BaseRealtimeClusterIntegrationTest.java | 5 ++-
.../integration/tests/BrokerQueryLimitTest.java | 22 ++++++------
.../tests/CLPEncodingRealtimeIntegrationTest.java | 20 +++++++++--
...tlyOnceKafkaRealtimeClusterIntegrationTest.java | 42 +++++++++++++---------
.../tests/NullHandlingIntegrationTest.java | 5 ++-
...PartialUpsertTableRebalanceIntegrationTest.java | 5 ++-
...sRealtimeIngestionSegmentCommitFailureTest.java | 3 +-
.../tests/QueryWorkloadIntegrationTest.java | 20 ++++++++++-
.../tests/RetentionManagerIntegrationTest.java | 19 +++++++++-
.../tests/StaleSegmentCheckIntegrationTest.java | 5 +--
.../TableRebalancePauselessIntegrationTest.java | 1 +
.../CustomDataQueryClusterIntegrationTest.java | 6 ++--
.../BaseLogicalTableIntegrationTest.java | 6 ++--
.../kafka30/server/EmbeddedKafkaCluster.java | 19 ++++++++--
17 files changed, 153 insertions(+), 58 deletions(-)
diff --git
a/pinot-connectors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/sink/PinotSinkUpsertTableIntegrationTest.java
b/pinot-connectors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/sink/PinotSinkUpsertTableIntegrationTest.java
index 42d7405d3de..642b54dc7d1 100644
---
a/pinot-connectors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/sink/PinotSinkUpsertTableIntegrationTest.java
+++
b/pinot-connectors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/sink/PinotSinkUpsertTableIntegrationTest.java
@@ -29,6 +29,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
@@ -52,6 +53,7 @@ import
org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -77,15 +79,14 @@ public class PinotSinkUpsertTableIntegrationTest extends
BaseClusterIntegrationT
// Start the Pinot cluster
startZk();
+ // Start Kafka and push data into Kafka
+ startKafka();
// Start a customized controller with more frequent realtime segment
validation
startController();
startBroker();
startServers(2);
startMinion();
- // Start Kafka and push data into Kafka
- startKafka();
-
// Push data to Kafka and set up table
setupTable(getTableName(), getKafkaTopic(), "gameScores_csv.tar.gz", null);
@@ -223,6 +224,22 @@ public class PinotSinkUpsertTableIntegrationTest extends
BaseClusterIntegrationT
return tableConfig;
}
+ @AfterClass
+ public void tearDown()
+ throws Exception {
+ try {
+ dropRealtimeTable(getTableName());
+ stopMinion();
+ stopServer();
+ stopBroker();
+ stopController();
+ stopKafka();
+ stopZk();
+ } finally {
+ FileUtils.deleteQuietly(_tempDir);
+ }
+ }
+
@Override
protected String getSchemaFileName() {
return "upsert_table_test.schema";
diff --git
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index c1784b43eea..d27eb3e094d 100644
---
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -189,6 +189,13 @@ public abstract class BaseClusterIntegrationTest extends
ClusterTest {
return useKafkaTransaction() ? DEFAULT_TRANSACTION_NUM_KAFKA_BROKERS :
DEFAULT_LLC_NUM_KAFKA_BROKERS;
}
+ /**
+ * Override to pass extra Kafka broker config properties when starting the
embedded Kafka cluster.
+ */
+ protected Properties getKafkaExtraProperties() {
+ return new Properties();
+ }
+
protected int getKafkaPort() {
int idx = RANDOM.nextInt(_kafkaStarters.size());
return _kafkaStarters.get(idx).getPort();
@@ -817,6 +824,7 @@ public abstract class BaseClusterIntegrationTest extends
ClusterTest {
int requestedBrokers = getNumKafkaBrokers();
Properties props = new Properties();
props.setProperty(EmbeddedKafkaCluster.BROKER_COUNT_PROP,
Integer.toString(requestedBrokers));
+ props.putAll(getKafkaExtraProperties());
EmbeddedKafkaCluster cluster = new EmbeddedKafkaCluster();
cluster.init(props);
cluster.start();
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasePauselessRealtimeIngestionTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasePauselessRealtimeIngestionTest.java
index bb91cf3338b..9c3a933c27e 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasePauselessRealtimeIngestionTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasePauselessRealtimeIngestionTest.java
@@ -100,6 +100,7 @@ public abstract class BasePauselessRealtimeIngestionTest
extends BaseClusterInte
throws Exception {
TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
startZk();
+ startKafka();
startController();
startBroker();
startServer();
@@ -113,7 +114,6 @@ public abstract class BasePauselessRealtimeIngestionTest
extends BaseClusterInte
protected void setupNonPauselessTable()
throws Exception {
_avroFiles = unpackAvroData(_tempDir);
- startKafka();
pushAvroIntoKafka(_avroFiles);
Schema schema = createSchema();
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseRealtimeClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseRealtimeClusterIntegrationTest.java
index f7492846fc8..aa6ff7e59c3 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseRealtimeClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseRealtimeClusterIntegrationTest.java
@@ -48,6 +48,8 @@ public abstract class BaseRealtimeClusterIntegrationTest
extends BaseClusterInte
// Start the Pinot cluster
startZk();
+ // Start Kafka
+ startKafka();
startController();
HelixConfigScope scope =
@@ -63,9 +65,6 @@ public abstract class BaseRealtimeClusterIntegrationTest
extends BaseClusterInte
startBroker();
startServer();
- // Start Kafka
- startKafka();
-
// Unpack the Avro files
List<File> avroFiles = unpackAvroData(_tempDir);
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BrokerQueryLimitTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BrokerQueryLimitTest.java
index 068b8c1cf00..59f816140eb 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BrokerQueryLimitTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BrokerQueryLimitTest.java
@@ -167,18 +167,16 @@ public class BrokerQueryLimitTest extends
BaseClusterIntegrationTest {
public void tearDown()
throws Exception {
LOGGER.warn("Tearing down integration test class: {}",
getClass().getSimpleName());
- dropOfflineTable(getTableName());
- FileUtils.deleteDirectory(_tempDir);
-
- // Stop Kafka
- LOGGER.warn("Stop Kafka in the integration test class");
- stopKafka();
- // Shutdown the Pinot cluster
- stopServer();
- stopBroker();
- stopController();
- stopZk();
- FileUtils.deleteDirectory(_tempDir);
+ try {
+ dropOfflineTable(getTableName());
+ stopServer();
+ stopBroker();
+ stopController();
+ stopKafka();
+ stopZk();
+ } finally {
+ FileUtils.deleteQuietly(_tempDir);
+ }
LOGGER.warn("Finished tearing down integration test class: {}",
getClass().getSimpleName());
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CLPEncodingRealtimeIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CLPEncodingRealtimeIntegrationTest.java
index dab36f2c1a9..e9f409bfe17 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CLPEncodingRealtimeIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CLPEncodingRealtimeIntegrationTest.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nullable;
+import org.apache.commons.io.FileUtils;
import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
@@ -30,6 +31,7 @@ import
org.apache.pinot.spi.config.table.ingestion.TransformConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
+import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -50,12 +52,11 @@ public class CLPEncodingRealtimeIntegrationTest extends
BaseClusterIntegrationTe
// Start the Pinot cluster
startZk();
+ startKafka();
// Start a customized controller with more frequent realtime segment
validation
startController();
startBroker();
startServer();
-
- startKafka();
pushAvroIntoKafka(_avroFiles);
Schema schema = createSchema();
@@ -103,6 +104,21 @@ public class CLPEncodingRealtimeIntegrationTest extends
BaseClusterIntegrationTe
.getLong(0), 53);
}
+ @AfterClass
+ public void tearDown()
+ throws Exception {
+ try {
+ dropRealtimeTable(getTableName());
+ stopServer();
+ stopBroker();
+ stopController();
+ stopKafka();
+ stopZk();
+ } finally {
+ FileUtils.deleteQuietly(_tempDir);
+ }
+ }
+
protected int getRealtimeSegmentFlushSize() {
return 30;
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExactlyOnceKafkaRealtimeClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExactlyOnceKafkaRealtimeClusterIntegrationTest.java
index b5c4fe12979..9aff311ed91 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExactlyOnceKafkaRealtimeClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExactlyOnceKafkaRealtimeClusterIntegrationTest.java
@@ -45,9 +45,12 @@ import
org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.util.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class ExactlyOnceKafkaRealtimeClusterIntegrationTest extends
BaseRealtimeClusterIntegrationTest {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ExactlyOnceKafkaRealtimeClusterIntegrationTest.class);
private static final int REALTIME_TABLE_CONFIG_RETRY_COUNT = 5;
private static final long REALTIME_TABLE_CONFIG_RETRY_WAIT_MS = 1_000L;
private static final long KAFKA_TOPIC_METADATA_READY_TIMEOUT_MS = 30_000L;
@@ -80,6 +83,13 @@ public class ExactlyOnceKafkaRealtimeClusterIntegrationTest
extends BaseRealtime
return true;
}
+ @Override
+ protected Properties getKafkaExtraProperties() {
+ Properties props = new Properties();
+ props.setProperty("log.flush.interval.messages", "1");
+ return props;
+ }
+
@Override
protected int getNumKafkaBrokers() {
return DEFAULT_TRANSACTION_NUM_KAFKA_BROKERS;
@@ -94,9 +104,8 @@ public class ExactlyOnceKafkaRealtimeClusterIntegrationTest
extends BaseRealtime
protected void pushAvroIntoKafka(List<File> avroFiles)
throws Exception {
String kafkaBrokerList = getKafkaBrokerList();
- // Use System.err for diagnostics - log4j2 console appender is filtered to
ERROR in CI
- System.err.println("[ExactlyOnce] Pushing transactional data to Kafka at:
" + kafkaBrokerList);
- System.err.println("[ExactlyOnce] Avro files count: " + avroFiles.size());
+ LOGGER.info("Pushing transactional data to Kafka at: {}", kafkaBrokerList);
+ LOGGER.info("Avro files count: {}", avroFiles.size());
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaBrokerList);
@@ -115,19 +124,19 @@ public class
ExactlyOnceKafkaRealtimeClusterIntegrationTest extends BaseRealtime
// transaction operations until the abort is fully done (markers written).
try (KafkaProducer<byte[], byte[]> producer = new
KafkaProducer<>(producerProps)) {
producer.initTransactions();
- System.err.println("[ExactlyOnce] initTransactions() succeeded");
+ LOGGER.info("initTransactions() succeeded");
// Transaction 1: aborted batch
long abortedCount = pushAvroRecords(producer, avroFiles, false);
- System.err.println("[ExactlyOnce] Aborted batch: " + abortedCount + "
records");
+ LOGGER.info("Aborted batch: {} records", abortedCount);
// Transaction 2: committed batch
long committedCount = pushAvroRecords(producer, avroFiles, true);
- System.err.println("[ExactlyOnce] Committed batch: " + committedCount +
" records");
+ LOGGER.info("Committed batch: {} records", committedCount);
}
// After producer is closed, verify data visibility with independent
consumers
- System.err.println("[ExactlyOnce] Producer closed. Verifying data
visibility...");
+ LOGGER.info("Producer closed. Verifying data visibility...");
waitForCommittedRecordsVisible(kafkaBrokerList);
}
@@ -136,7 +145,7 @@ public class ExactlyOnceKafkaRealtimeClusterIntegrationTest
extends BaseRealtime
* This ensures transaction markers have been fully propagated before
returning.
*/
private void waitForCommittedRecordsVisible(String brokerList) {
- long deadline = System.currentTimeMillis() + 60_000L;
+ long deadline = System.currentTimeMillis() + 120_000L;
int lastCommitted = 0;
int lastUncommitted = 0;
int iteration = 0;
@@ -145,15 +154,14 @@ public class
ExactlyOnceKafkaRealtimeClusterIntegrationTest extends BaseRealtime
iteration++;
lastCommitted = countRecords(brokerList, "read_committed");
if (lastCommitted > 0) {
- System.err.println("[ExactlyOnce] Verification OK: read_committed=" +
lastCommitted
- + " after " + iteration + " iterations");
+ LOGGER.info("Verification OK: read_committed={} after {} iterations",
lastCommitted, iteration);
return;
}
// Check if data reached Kafka at all
if (iteration == 1 || iteration % 5 == 0) {
lastUncommitted = countRecords(brokerList, "read_uncommitted");
- System.err.println("[ExactlyOnce] Verification iteration " + iteration
- + ": read_committed=" + lastCommitted + ", read_uncommitted=" +
lastUncommitted);
+ LOGGER.info("Verification iteration {}: read_committed={},
read_uncommitted={}", iteration, lastCommitted,
+ lastUncommitted);
}
try {
Thread.sleep(2_000L);
@@ -165,9 +173,9 @@ public class ExactlyOnceKafkaRealtimeClusterIntegrationTest
extends BaseRealtime
// Final diagnostic dump
lastUncommitted = countRecords(brokerList, "read_uncommitted");
- System.err.println("[ExactlyOnce] VERIFICATION FAILED after 60s:
read_committed=" + lastCommitted
- + ", read_uncommitted=" + lastUncommitted);
- throw new AssertionError("[ExactlyOnce] Transaction markers were not
propagated within 60s; "
+ LOGGER.error("VERIFICATION FAILED after 120s: read_committed={},
read_uncommitted={}", lastCommitted,
+ lastUncommitted);
+ throw new AssertionError("[ExactlyOnce] Transaction markers were not
propagated within 120s; "
+ "committed records are not visible to read_committed consumers. "
+ "read_committed=" + lastCommitted + ", read_uncommitted=" +
lastUncommitted);
}
@@ -251,7 +259,7 @@ public class ExactlyOnceKafkaRealtimeClusterIntegrationTest
extends BaseRealtime
try (KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props)) {
List<PartitionInfo> partitions = consumer.partitionsFor(getKafkaTopic(),
Duration.ofSeconds(10));
if (partitions == null || partitions.isEmpty()) {
- System.err.println("[ExactlyOnce] No partitions found for topic " +
getKafkaTopic());
+ LOGGER.warn("No partitions found for topic {}", getKafkaTopic());
return 0;
}
for (PartitionInfo pi : partitions) {
@@ -270,7 +278,7 @@ public class ExactlyOnceKafkaRealtimeClusterIntegrationTest
extends BaseRealtime
totalRecords += partitionRecords;
}
} catch (Exception e) {
- System.err.println("[ExactlyOnce] Error counting records with " +
isolationLevel + ": " + e.getMessage());
+ LOGGER.error("Error counting records with {}: {}", isolationLevel,
e.getMessage());
}
return totalRecords;
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java
index c25af339add..f0375a74ca0 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java
@@ -51,13 +51,12 @@ public class NullHandlingIntegrationTest extends
BaseClusterIntegrationTestSet
// Start the Pinot cluster
startZk();
+ // Start Kafka
+ startKafka();
startController();
startBroker();
startServer();
- // Start Kafka
- startKafka();
-
// Unpack the Avro files
List<File> avroFiles = unpackAvroData(_tempDir);
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java
index 70aa705dff2..e3bbde37258 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java
@@ -83,13 +83,12 @@ public class PartialUpsertTableRebalanceIntegrationTest
extends BaseClusterInteg
TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
startZk();
+ // Start Kafka
+ startKafka();
startController();
startBroker();
startServers(NUM_SERVERS);
- // Start Kafka and push data into Kafka
- startKafka();
-
_resourceManager = _controllerStarter.getHelixResourceManager();
_tableRebalancer = new
TableRebalancer(_resourceManager.getHelixZkManager());
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessRealtimeIngestionSegmentCommitFailureTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessRealtimeIngestionSegmentCommitFailureTest.java
index 1cd5842fbd9..23c9bcc086d 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessRealtimeIngestionSegmentCommitFailureTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessRealtimeIngestionSegmentCommitFailureTest.java
@@ -92,6 +92,8 @@ public class
PauselessRealtimeIngestionSegmentCommitFailureTest extends BaseClus
// Start the Pinot cluster
startZk();
+ // Start Kafka
+ startKafka();
// Start a customized controller with more frequent realtime segment
validation
startController();
startBroker();
@@ -99,7 +101,6 @@ public class
PauselessRealtimeIngestionSegmentCommitFailureTest extends BaseClus
// load data in kafka
List<File> avroFiles = unpackAvroData(_tempDir);
- startKafka();
pushAvroIntoKafka(avroFiles);
setMaxSegmentCompletionTimeMillis();
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryWorkloadIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryWorkloadIntegrationTest.java
index ce175b81e8e..00db1afc4fd 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryWorkloadIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryWorkloadIntegrationTest.java
@@ -26,6 +26,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
@@ -43,6 +44,7 @@ import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -72,10 +74,10 @@ public class QueryWorkloadIntegrationTest extends
BaseClusterIntegrationTest {
// Start Zk, Kafka and Pinot
startZk();
+ startKafka();
startController();
startBroker();
startServer();
- startKafka();
List<File> avroFiles = getAllAvroFiles();
List<File> offlineAvroFiles = getOfflineAvroFiles(avroFiles,
NUM_OFFLINE_SEGMENTS);
@@ -115,6 +117,22 @@ public class QueryWorkloadIntegrationTest extends
BaseClusterIntegrationTest {
waitForAllDocsLoaded(100_000L);
}
+ @AfterClass
+ public void tearDown()
+ throws Exception {
+ try {
+ dropOfflineTable(getTableName());
+ dropRealtimeTable(getTableName());
+ stopServer();
+ stopBroker();
+ stopController();
+ stopKafka();
+ stopZk();
+ } finally {
+ FileUtils.deleteQuietly(_tempDir);
+ }
+ }
+
// TODO: Expand tests to cover more scenarios for workload enforcement
@Test
public void testQueryWorkloadConfig() throws Exception {
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RetentionManagerIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RetentionManagerIntegrationTest.java
index d1b229dbb15..cc14afc1b5a 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RetentionManagerIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RetentionManagerIntegrationTest.java
@@ -24,6 +24,7 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Map;
+import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.utils.URIUtils;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.server.starter.helix.HelixInstanceDataManagerConfig;
@@ -36,6 +37,7 @@ import org.apache.pinot.util.TestUtils;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -71,6 +73,7 @@ public class RetentionManagerIntegrationTest extends
BaseClusterIntegrationTest
throws Exception {
TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
startZk();
+ startKafka();
startController();
startBroker();
startServer();
@@ -81,7 +84,6 @@ public class RetentionManagerIntegrationTest extends
BaseClusterIntegrationTest
protected void setupTable()
throws Exception {
_avroFiles = unpackAvroData(_tempDir);
- startKafka();
pushAvroIntoKafka(_avroFiles);
Schema schema = createSchema();
@@ -95,6 +97,21 @@ public class RetentionManagerIntegrationTest extends
BaseClusterIntegrationTest
waitForDocsLoaded(600_000L, true, tableConfig.getTableName());
}
+ @AfterClass
+ public void tearDown()
+ throws Exception {
+ try {
+ dropRealtimeTable(getTableName());
+ stopServer();
+ stopBroker();
+ stopController();
+ stopKafka();
+ stopZk();
+ } finally {
+ FileUtils.deleteQuietly(_tempDir);
+ }
+ }
+
@Test
public void testClusterConfigChangeListener()
throws IOException, URISyntaxException {
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StaleSegmentCheckIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StaleSegmentCheckIntegrationTest.java
index c2fbd83cb94..c1045bae7b4 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StaleSegmentCheckIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StaleSegmentCheckIntegrationTest.java
@@ -73,12 +73,12 @@ public class StaleSegmentCheckIntegrationTest extends
BaseClusterIntegrationTest
// Start the Pinot cluster
startZk();
+ // Start Kafka
+ startKafka();
startController();
startBroker();
startServer();
startMinion();
- // Start Kafka
- startKafka();
_taskManager = _controllerStarter.getTaskManager();
_taskResourceManager = _controllerStarter.getHelixTaskResourceManager();
@@ -193,6 +193,7 @@ public class StaleSegmentCheckIntegrationTest extends
BaseClusterIntegrationTest
stopServer();
stopBroker();
stopController();
+ stopKafka();
stopZk();
} finally {
FileUtils.deleteQuietly(_tempDir);
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalancePauselessIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalancePauselessIntegrationTest.java
index 945fac33506..5a72a3c1e22 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalancePauselessIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalancePauselessIntegrationTest.java
@@ -73,6 +73,7 @@ public class TableRebalancePauselessIntegrationTest extends
BasePauselessRealtim
throws Exception {
TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
startZk();
+ startKafka();
startController();
startBroker();
startServer();
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java
index 5714057a7a8..739214cfa5c 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java
@@ -85,13 +85,13 @@ public abstract class CustomDataQueryClusterIntegrationTest
extends BaseClusterI
public void tearDownSuite()
throws Exception {
LOGGER.warn("Tearing down integration test suite");
- // Stop Kafka
- LOGGER.warn("Stop Kafka in the integration test suite");
- stopKafka();
// Shutdown the Pinot cluster
stopServer();
stopBroker();
stopController();
+ // Stop Kafka
+ LOGGER.warn("Stop Kafka in the integration test suite");
+ stopKafka();
stopZk();
FileUtils.deleteDirectory(_tempDir);
LOGGER.warn("Finished tearing down integration test suite");
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java
index b06a6d1481b..877e1b25489 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java
@@ -94,13 +94,13 @@ public abstract class BaseLogicalTableIntegrationTest
extends BaseClusterIntegra
public void tearDownSuite()
throws Exception {
LOGGER.info("Tearing down integration test suite");
- // Stop Kafka
- LOGGER.info("Stop Kafka in the integration test suite");
- stopKafka();
// Shutdown the Pinot cluster
stopServer();
stopBroker();
stopController();
+ // Stop Kafka
+ LOGGER.info("Stop Kafka in the integration test suite");
+ stopKafka();
stopZk();
FileUtils.deleteDirectory(_tempDir);
LOGGER.info("Finished tearing down integration test suite");
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/server/EmbeddedKafkaCluster.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/server/EmbeddedKafkaCluster.java
index 56b3fe8eef2..a8296c172ba 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/server/EmbeddedKafkaCluster.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/server/EmbeddedKafkaCluster.java
@@ -46,12 +46,20 @@ public class EmbeddedKafkaCluster implements
StreamDataServerStartable {
private static final int TOPIC_MUTATION_RETRIES = 5;
private int _brokerCount = 1;
+ private final Properties _extraConfigProps = new Properties();
private KafkaClusterTestKit _cluster;
private String _bootstrapServers;
@Override
public void init(Properties props) {
_brokerCount = Integer.parseInt(props.getProperty(BROKER_COUNT_PROP, "1"));
+ _extraConfigProps.clear();
+ // Forward any additional properties (excluding our internal ones) as
Kafka broker config
+ for (String key : props.stringPropertyNames()) {
+ if (!key.equals(BROKER_COUNT_PROP)) {
+ _extraConfigProps.setProperty(key, props.getProperty(key));
+ }
+ }
}
@Override
@@ -67,15 +75,20 @@ public class EmbeddedKafkaCluster implements
StreamDataServerStartable {
.setBootstrapMetadataVersion(MetadataVersion.latestProduction())
.build();
- _cluster = new KafkaClusterTestKit.Builder(nodes)
+ KafkaClusterTestKit.Builder builder = new
KafkaClusterTestKit.Builder(nodes)
.setConfigProp("offsets.topic.replication.factor",
String.valueOf(replicationFactor))
.setConfigProp("offsets.topic.num.partitions", "1")
.setConfigProp("transaction.state.log.replication.factor",
String.valueOf(replicationFactor))
.setConfigProp("transaction.state.log.min.isr", "1")
.setConfigProp("transaction.state.log.num.partitions", "1")
- .setConfigProp("group.initial.rebalance.delay.ms", "0")
+ .setConfigProp("group.initial.rebalance.delay.ms", "0");
- .build();
+ // Apply any extra config properties passed via init()
+ for (String key : _extraConfigProps.stringPropertyNames()) {
+ builder.setConfigProp(key, _extraConfigProps.getProperty(key));
+ }
+
+ _cluster = builder.build();
_cluster.format();
_cluster.startup();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]