This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ae1048bdb6 bugfix: CustomDataQueryClusterIntegrationTest shouldn't 
create kafka topic during the setupSuite call and use kafka binary for 
compatibility tests (#17738)
8ae1048bdb6 is described below

commit 8ae1048bdb679d6c1d1d829803079c81add3853c
Author: Xiang Fu <[email protected]>
AuthorDate: Sun Feb 22 19:01:43 2026 -0800

    bugfix: CustomDataQueryClusterIntegrationTest shouldn't create kafka topic 
during the setupSuite call and use kafka binary for compatibility tests (#17738)
    
    * bugfix: don't create kafka topic during the kafka start for 
CustomDataQueryClusterIntegrationTest
    
    * move kafka start/stop commands from the pinot-admin
---
 compatibility-verifier/compCheck.sh                | 70 +++++++++++++++++++--
 ...tlyOnceKafkaRealtimeClusterIntegrationTest.java | 72 ++++++++++++++++++++++
 .../CustomDataQueryClusterIntegrationTest.java     |  2 +-
 3 files changed, 138 insertions(+), 6 deletions(-)

diff --git a/compatibility-verifier/compCheck.sh 
b/compatibility-verifier/compCheck.sh
index c90f8d75302..84f899da867 100755
--- a/compatibility-verifier/compCheck.sh
+++ b/compatibility-verifier/compCheck.sh
@@ -110,8 +110,8 @@ function waitForKafkaReady() {
   status=1
   while [ $status -ne 0 ]; do
     sleep 1
-    echo "Checking port 19092 for kafka ready"
-    echo x | nc localhost 19092 1>/dev/null 2>&1
+    echo "Checking port ${KAFKA_PORT} for kafka ready"
+    echo x | nc localhost ${KAFKA_PORT} 1>/dev/null 2>&1
     status=$?
   done
 }
@@ -196,8 +196,7 @@ function startService() {
     ./pinot-admin.sh StartServer ${configFileArg} 
1>${LOG_DIR}/server2.${logCount}.log 2>&1 &
         echo $! >${PID_DIR}/server2.pid
   elif [ "$serviceName" = "kafka" ]; then
-    ./pinot-admin.sh StartKafka -zkAddress localhost:${ZK_PORT}/kafka 
1>${LOG_DIR}/kafka.${logCount}.log 2>&1 &
-    echo $! >${PID_DIR}/kafka.pid
+    startKafkaService
   fi
   # Keep log files distinct so we can debug
   logCount=$((logCount + 1))
@@ -206,12 +205,63 @@ function startService() {
   popd 1>/dev/null || exit 1
 }
 
+function setupKafkaBinary() {
+  if [ ! -x "${KAFKA_HOME}/bin/kafka-server-start.sh" ]; then
+    echo "Setting up Kafka from ${KAFKA_DOWNLOAD_URL}"
+    rm -rf "${KAFKA_HOME}"
+    rm -rf "${workingDir}/kafka_${KAFKA_SCALA_VERSION}-${KAFKA_VERSION}"
+    if [ ! -f "${KAFKA_ARCHIVE}" ]; then
+      curl -fSL "${KAFKA_DOWNLOAD_URL}" -o "${KAFKA_ARCHIVE}" 
1>${LOG_DIR}/kafka.download.${logCount}.log 2>&1 || exit 1
+    fi
+    tar -xzf "${KAFKA_ARCHIVE}" -C "${workingDir}" 
1>>${LOG_DIR}/kafka.download.${logCount}.log 2>&1 || exit 1
+    mv "${workingDir}/kafka_${KAFKA_SCALA_VERSION}-${KAFKA_VERSION}" 
"${KAFKA_HOME}"
+  fi
+
+  rm -rf "${KAFKA_DATA_DIR}"
+  mkdir -p "${KAFKA_DATA_DIR}"
+  cat <<EOF >"${KAFKA_CONFIG_FILE}"
+listeners=${KAFKA_LISTENER}
+advertised.listeners=${KAFKA_LISTENER}
+num.io.threads=8
+num.network.threads=3
+socket.send.buffer.bytes=102400
+socket.receive.buffer.bytes=102400
+socket.request.max.bytes=104857600
+log.dirs=${KAFKA_DATA_DIR}
+num.partitions=1
+num.recovery.threads.per.data.dir=1
+offsets.topic.replication.factor=1
+transaction.state.log.replication.factor=1
+transaction.state.log.min.isr=1
+zookeeper.connect=localhost:${ZK_PORT}/kafka
+zookeeper.connection.timeout.ms=6000
+EOF
+}
+
+function startKafkaService() {
+  setupKafkaBinary
+  "${KAFKA_HOME}/bin/kafka-server-start.sh" "${KAFKA_CONFIG_FILE}" 
1>${LOG_DIR}/kafka.${logCount}.log 2>&1 &
+  echo $! >${PID_DIR}/kafka.pid
+}
+
+function stopKafkaService() {
+  if [ -x "${KAFKA_HOME}/bin/kafka-server-stop.sh" ]; then
+    "${KAFKA_HOME}/bin/kafka-server-stop.sh" --config "${KAFKA_CONFIG_FILE}" 
1>${LOG_DIR}/kafka.stop.${logCount}.log 2>&1
+  else
+    kill -9 $1 1>/dev/null 2>&1
+  fi
+}
+
 # Given a component, check if it known to be running and stop that specific 
component
 function stopService() {
   serviceName=$1
   if [ -f "${PID_DIR}/${serviceName}".pid ]; then
     pid=$(cat "${PID_DIR}/${serviceName}".pid)
-    kill -9 $pid 1>/dev/null 2>&1
+    if [ "$serviceName" = "kafka" ]; then
+      stopKafkaService $pid
+    else
+      kill -9 $pid 1>/dev/null 2>&1
+    fi
     # TODO Kill without -9 and add a while loop waiting for process to die
     status=0
     while [ $status -ne 1 ]; do
@@ -404,6 +454,15 @@ SERVER_NETTY_PORT=8098
 SERVER_2_NETTY_PORT=9098
 SERVER_GRPC_PORT=8090
 SERVER_2_GRPC_PORT=9090
+KAFKA_VERSION="${KAFKA_VERSION:-3.9.1}"
+ KAFKA_SCALA_VERSION="${KAFKA_SCALA_VERSION:-2.13}"
+ 
KAFKA_DOWNLOAD_URL="${KAFKA_DOWNLOAD_URL:-https://archive.apache.org/dist/kafka/${KAFKA_VERSION}/kafka_${KAFKA_SCALA_VERSION}-${KAFKA_VERSION}.tgz}";
+ KAFKA_PORT="${KAFKA_PORT:-19092}"
+ KAFKA_LISTENER="PLAINTEXT://127.0.0.1:${KAFKA_PORT}"
+ KAFKA_HOME="${workingDir}/kafka"
+ 
KAFKA_ARCHIVE="${workingDir}/kafka_${KAFKA_SCALA_VERSION}-${KAFKA_VERSION}.tgz"
+ KAFKA_DATA_DIR="${workingDir}/kafka-data"
+ KAFKA_CONFIG_FILE="${workingDir}/kafka-server.properties"
 
 PID_DIR=${workingDir}/pids
 LOG_DIR=${workingDir}/logs
@@ -427,6 +486,7 @@ setupCompatTester
 # check that the default ports are open
 if ! checkPortAvailable ${SERVER_ADMIN_PORT} || ! checkPortAvailable 
${SERVER_NETTY_PORT} || ! checkPortAvailable ${SERVER_GRPC_PORT} ||
   ! checkPortAvailable ${BROKER_QUERY_PORT} || ! checkPortAvailable 
${CONTROLLER_PORT} || ! checkPortAvailable ${ZK_PORT} ||
+  ! checkPortAvailable ${KAFKA_PORT} ||
   { [ -f "${SERVER_CONF_2}" ] && { ! checkPortAvailable ${SERVER_2_ADMIN_PORT} 
|| ! checkPortAvailable ${SERVER_2_NETTY_PORT} || ! checkPortAvailable 
${SERVER_2_GRPC_PORT}; } ; }; then
   exit 1
 fi
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExactlyOnceKafkaRealtimeClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExactlyOnceKafkaRealtimeClusterIntegrationTest.java
index 6bed1cb3b47..d11ad4ac456 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExactlyOnceKafkaRealtimeClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExactlyOnceKafkaRealtimeClusterIntegrationTest.java
@@ -19,11 +19,48 @@
 package org.apache.pinot.integration.tests;
 
 import java.io.File;
+import java.io.IOException;
+import java.time.Duration;
 import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.util.TestUtils;
 
 
 public class ExactlyOnceKafkaRealtimeClusterIntegrationTest extends 
BaseRealtimeClusterIntegrationTest {
 
+  private static final int REALTIME_TABLE_CONFIG_RETRY_COUNT = 5;
+  private static final long REALTIME_TABLE_CONFIG_RETRY_WAIT_MS = 1_000L;
+  private static final long KAFKA_TOPIC_METADATA_READY_TIMEOUT_MS = 30_000L;
+
+  @Override
+  public void addTableConfig(TableConfig tableConfig)
+      throws IOException {
+    for (int attempt = 1; attempt <= REALTIME_TABLE_CONFIG_RETRY_COUNT; 
attempt++) {
+      try {
+        super.addTableConfig(tableConfig);
+        return;
+      } catch (IOException e) {
+        if (!isRetryableRealtimePartitionMetadataError(e) || attempt == 
REALTIME_TABLE_CONFIG_RETRY_COUNT) {
+          throw e;
+        }
+        waitForKafkaTopicMetadataReadyForConsumer(getKafkaTopic(), 
getNumKafkaPartitions());
+        try {
+          Thread.sleep(REALTIME_TABLE_CONFIG_RETRY_WAIT_MS);
+        } catch (InterruptedException interruptedException) {
+          Thread.currentThread().interrupt();
+          throw new IOException("Interrupted while retrying realtime table 
creation for topic: " + getKafkaTopic(),
+              interruptedException);
+        }
+      }
+    }
+  }
+
   @Override
   protected boolean useKafkaTransaction() {
     return true;
@@ -52,4 +89,39 @@ public class ExactlyOnceKafkaRealtimeClusterIntegrationTest 
extends BaseRealtime
         .pushAvroIntoKafkaWithTransaction(avroFiles, kafkaBrokerList, 
getKafkaTopic(),
             getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(), 
getPartitionColumn(), true);
   }
+
+  private boolean isRetryableRealtimePartitionMetadataError(Throwable 
throwable) {
+    String errorToken = "Failed to fetch partition information for topic: " + 
getKafkaTopic();
+    Throwable current = throwable;
+    while (current != null) {
+      String message = current.getMessage();
+      if (message != null && message.contains(errorToken)) {
+        return true;
+      }
+      current = current.getCause();
+    }
+    return false;
+  }
+
+  private void waitForKafkaTopicMetadataReadyForConsumer(String topic, int 
expectedPartitions) {
+    TestUtils.waitForCondition(aVoid -> 
isKafkaTopicMetadataReadyForConsumer(topic, expectedPartitions), 200L,
+        KAFKA_TOPIC_METADATA_READY_TIMEOUT_MS,
+        "Kafka topic '" + topic + "' metadata is not visible to consumers");
+  }
+
+  private boolean isKafkaTopicMetadataReadyForConsumer(String topic, int 
expectedPartitions) {
+    Properties consumerProps = new Properties();
+    consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
getKafkaBrokerList());
+    consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, 
"pinot-kafka-topic-ready-" + UUID.randomUUID());
+    consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
+    consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
+    consumerProps.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "5000");
+    consumerProps.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "5000");
+    try (KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(consumerProps)) {
+      List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic, 
Duration.ofSeconds(5));
+      return partitionInfos != null && partitionInfos.size() >= 
expectedPartitions;
+    } catch (Exception e) {
+      return false;
+    }
+  }
 }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java
index cac229e8bec..5714057a7a8 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java
@@ -74,7 +74,7 @@ public abstract class CustomDataQueryClusterIntegrationTest 
extends BaseClusterI
     // Start the Pinot cluster
     startZk();
     LOGGER.warn("Start Kafka in the integration test suite");
-    startKafka();
+    startKafkaWithoutTopic();
     startController();
     startBroker();
     startServer();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to