This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 8ae1048bdb6 bugfix: CustomDataQueryClusterIntegrationTest shouldn't
create kafka topic during the setupSuite call and use kafka binary for
compatibility tests (#17738)
8ae1048bdb6 is described below
commit 8ae1048bdb679d6c1d1d829803079c81add3853c
Author: Xiang Fu <[email protected]>
AuthorDate: Sun Feb 22 19:01:43 2026 -0800
bugfix: CustomDataQueryClusterIntegrationTest shouldn't create kafka topic
during the setupSuite call and use kafka binary for compatibility tests (#17738)
* bugfix: don't create kafka topic during the kafka start for
CustomDataQueryClusterIntegrationTest
* move kafka start/stop commands from the pinot-admin
---
compatibility-verifier/compCheck.sh | 70 +++++++++++++++++++--
...tlyOnceKafkaRealtimeClusterIntegrationTest.java | 72 ++++++++++++++++++++++
.../CustomDataQueryClusterIntegrationTest.java | 2 +-
3 files changed, 138 insertions(+), 6 deletions(-)
diff --git a/compatibility-verifier/compCheck.sh
b/compatibility-verifier/compCheck.sh
index c90f8d75302..84f899da867 100755
--- a/compatibility-verifier/compCheck.sh
+++ b/compatibility-verifier/compCheck.sh
@@ -110,8 +110,8 @@ function waitForKafkaReady() {
status=1
while [ $status -ne 0 ]; do
sleep 1
- echo "Checking port 19092 for kafka ready"
- echo x | nc localhost 19092 1>/dev/null 2>&1
+ echo "Checking port ${KAFKA_PORT} for kafka ready"
+ echo x | nc localhost ${KAFKA_PORT} 1>/dev/null 2>&1
status=$?
done
}
@@ -196,8 +196,7 @@ function startService() {
./pinot-admin.sh StartServer ${configFileArg}
1>${LOG_DIR}/server2.${logCount}.log 2>&1 &
echo $! >${PID_DIR}/server2.pid
elif [ "$serviceName" = "kafka" ]; then
- ./pinot-admin.sh StartKafka -zkAddress localhost:${ZK_PORT}/kafka
1>${LOG_DIR}/kafka.${logCount}.log 2>&1 &
- echo $! >${PID_DIR}/kafka.pid
+ startKafkaService
fi
# Keep log files distinct so we can debug
logCount=$((logCount + 1))
@@ -206,12 +205,63 @@ function startService() {
popd 1>/dev/null || exit 1
}
+function setupKafkaBinary() {
+ if [ ! -x "${KAFKA_HOME}/bin/kafka-server-start.sh" ]; then
+ echo "Setting up Kafka from ${KAFKA_DOWNLOAD_URL}"
+ rm -rf "${KAFKA_HOME}"
+ rm -rf "${workingDir}/kafka_${KAFKA_SCALA_VERSION}-${KAFKA_VERSION}"
+ if [ ! -f "${KAFKA_ARCHIVE}" ]; then
+ curl -fSL "${KAFKA_DOWNLOAD_URL}" -o "${KAFKA_ARCHIVE}"
1>${LOG_DIR}/kafka.download.${logCount}.log 2>&1 || exit 1
+ fi
+ tar -xzf "${KAFKA_ARCHIVE}" -C "${workingDir}"
1>>${LOG_DIR}/kafka.download.${logCount}.log 2>&1 || exit 1
+ mv "${workingDir}/kafka_${KAFKA_SCALA_VERSION}-${KAFKA_VERSION}"
"${KAFKA_HOME}"
+ fi
+
+ rm -rf "${KAFKA_DATA_DIR}"
+ mkdir -p "${KAFKA_DATA_DIR}"
+ cat <<EOF >"${KAFKA_CONFIG_FILE}"
+listeners=${KAFKA_LISTENER}
+advertised.listeners=${KAFKA_LISTENER}
+num.io.threads=8
+num.network.threads=3
+socket.send.buffer.bytes=102400
+socket.receive.buffer.bytes=102400
+socket.request.max.bytes=104857600
+log.dirs=${KAFKA_DATA_DIR}
+num.partitions=1
+num.recovery.threads.per.data.dir=1
+offsets.topic.replication.factor=1
+transaction.state.log.replication.factor=1
+transaction.state.log.min.isr=1
+zookeeper.connect=localhost:${ZK_PORT}/kafka
+zookeeper.connection.timeout.ms=6000
+EOF
+}
+
+function startKafkaService() {
+ setupKafkaBinary
+ "${KAFKA_HOME}/bin/kafka-server-start.sh" "${KAFKA_CONFIG_FILE}"
1>${LOG_DIR}/kafka.${logCount}.log 2>&1 &
+ echo $! >${PID_DIR}/kafka.pid
+}
+
+function stopKafkaService() {
+ if [ -x "${KAFKA_HOME}/bin/kafka-server-stop.sh" ]; then
+ "${KAFKA_HOME}/bin/kafka-server-stop.sh" --config "${KAFKA_CONFIG_FILE}"
1>${LOG_DIR}/kafka.stop.${logCount}.log 2>&1
+ else
+ kill -9 $1 1>/dev/null 2>&1
+ fi
+}
+
# Given a component, check if it known to be running and stop that specific
component
function stopService() {
serviceName=$1
if [ -f "${PID_DIR}/${serviceName}".pid ]; then
pid=$(cat "${PID_DIR}/${serviceName}".pid)
- kill -9 $pid 1>/dev/null 2>&1
+ if [ "$serviceName" = "kafka" ]; then
+ stopKafkaService $pid
+ else
+ kill -9 $pid 1>/dev/null 2>&1
+ fi
# TODO Kill without -9 and add a while loop waiting for process to die
status=0
while [ $status -ne 1 ]; do
@@ -404,6 +454,15 @@ SERVER_NETTY_PORT=8098
SERVER_2_NETTY_PORT=9098
SERVER_GRPC_PORT=8090
SERVER_2_GRPC_PORT=9090
+KAFKA_VERSION="${KAFKA_VERSION:-3.9.1}"
+ KAFKA_SCALA_VERSION="${KAFKA_SCALA_VERSION:-2.13}"
+
KAFKA_DOWNLOAD_URL="${KAFKA_DOWNLOAD_URL:-https://archive.apache.org/dist/kafka/${KAFKA_VERSION}/kafka_${KAFKA_SCALA_VERSION}-${KAFKA_VERSION}.tgz}"
+ KAFKA_PORT="${KAFKA_PORT:-19092}"
+ KAFKA_LISTENER="PLAINTEXT://127.0.0.1:${KAFKA_PORT}"
+ KAFKA_HOME="${workingDir}/kafka"
+
KAFKA_ARCHIVE="${workingDir}/kafka_${KAFKA_SCALA_VERSION}-${KAFKA_VERSION}.tgz"
+ KAFKA_DATA_DIR="${workingDir}/kafka-data"
+ KAFKA_CONFIG_FILE="${workingDir}/kafka-server.properties"
PID_DIR=${workingDir}/pids
LOG_DIR=${workingDir}/logs
@@ -427,6 +486,7 @@ setupCompatTester
# check that the default ports are open
if ! checkPortAvailable ${SERVER_ADMIN_PORT} || ! checkPortAvailable
${SERVER_NETTY_PORT} || ! checkPortAvailable ${SERVER_GRPC_PORT} ||
! checkPortAvailable ${BROKER_QUERY_PORT} || ! checkPortAvailable
${CONTROLLER_PORT} || ! checkPortAvailable ${ZK_PORT} ||
+ ! checkPortAvailable ${KAFKA_PORT} ||
{ [ -f "${SERVER_CONF_2}" ] && { ! checkPortAvailable ${SERVER_2_ADMIN_PORT}
|| ! checkPortAvailable ${SERVER_2_NETTY_PORT} || ! checkPortAvailable
${SERVER_2_GRPC_PORT}; } ; }; then
exit 1
fi
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExactlyOnceKafkaRealtimeClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExactlyOnceKafkaRealtimeClusterIntegrationTest.java
index 6bed1cb3b47..d11ad4ac456 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExactlyOnceKafkaRealtimeClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExactlyOnceKafkaRealtimeClusterIntegrationTest.java
@@ -19,11 +19,48 @@
package org.apache.pinot.integration.tests;
import java.io.File;
+import java.io.IOException;
+import java.time.Duration;
import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.util.TestUtils;
public class ExactlyOnceKafkaRealtimeClusterIntegrationTest extends
BaseRealtimeClusterIntegrationTest {
+ private static final int REALTIME_TABLE_CONFIG_RETRY_COUNT = 5;
+ private static final long REALTIME_TABLE_CONFIG_RETRY_WAIT_MS = 1_000L;
+ private static final long KAFKA_TOPIC_METADATA_READY_TIMEOUT_MS = 30_000L;
+
+ @Override
+ public void addTableConfig(TableConfig tableConfig)
+ throws IOException {
+ for (int attempt = 1; attempt <= REALTIME_TABLE_CONFIG_RETRY_COUNT;
attempt++) {
+ try {
+ super.addTableConfig(tableConfig);
+ return;
+ } catch (IOException e) {
+ if (!isRetryableRealtimePartitionMetadataError(e) || attempt ==
REALTIME_TABLE_CONFIG_RETRY_COUNT) {
+ throw e;
+ }
+ waitForKafkaTopicMetadataReadyForConsumer(getKafkaTopic(),
getNumKafkaPartitions());
+ try {
+ Thread.sleep(REALTIME_TABLE_CONFIG_RETRY_WAIT_MS);
+ } catch (InterruptedException interruptedException) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted while retrying realtime table
creation for topic: " + getKafkaTopic(),
+ interruptedException);
+ }
+ }
+ }
+ }
+
@Override
protected boolean useKafkaTransaction() {
return true;
@@ -52,4 +89,39 @@ public class ExactlyOnceKafkaRealtimeClusterIntegrationTest
extends BaseRealtime
.pushAvroIntoKafkaWithTransaction(avroFiles, kafkaBrokerList,
getKafkaTopic(),
getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(),
getPartitionColumn(), true);
}
+
+ private boolean isRetryableRealtimePartitionMetadataError(Throwable
throwable) {
+ String errorToken = "Failed to fetch partition information for topic: " +
getKafkaTopic();
+ Throwable current = throwable;
+ while (current != null) {
+ String message = current.getMessage();
+ if (message != null && message.contains(errorToken)) {
+ return true;
+ }
+ current = current.getCause();
+ }
+ return false;
+ }
+
+ private void waitForKafkaTopicMetadataReadyForConsumer(String topic, int
expectedPartitions) {
+ TestUtils.waitForCondition(aVoid ->
isKafkaTopicMetadataReadyForConsumer(topic, expectedPartitions), 200L,
+ KAFKA_TOPIC_METADATA_READY_TIMEOUT_MS,
+ "Kafka topic '" + topic + "' metadata is not visible to consumers");
+ }
+
+ private boolean isKafkaTopicMetadataReadyForConsumer(String topic, int
expectedPartitions) {
+ Properties consumerProps = new Properties();
+ consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
getKafkaBrokerList());
+ consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG,
"pinot-kafka-topic-ready-" + UUID.randomUUID());
+ consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
+ consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
+ consumerProps.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "5000");
+ consumerProps.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "5000");
+ try (KafkaConsumer<byte[], byte[]> consumer = new
KafkaConsumer<>(consumerProps)) {
+ List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic,
Duration.ofSeconds(5));
+ return partitionInfos != null && partitionInfos.size() >=
expectedPartitions;
+ } catch (Exception e) {
+ return false;
+ }
+ }
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java
index cac229e8bec..5714057a7a8 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java
@@ -74,7 +74,7 @@ public abstract class CustomDataQueryClusterIntegrationTest
extends BaseClusterI
// Start the Pinot cluster
startZk();
LOGGER.warn("Start Kafka in the integration test suite");
- startKafka();
+ startKafkaWithoutTopic();
startController();
startBroker();
startServer();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]