xiangfu0 commented on code in PR #17602:
URL: https://github.com/apache/pinot/pull/17602#discussion_r2803831436


##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/main/java/org/apache/pinot/plugin/stream/kafka40/KafkaStreamMetadataProvider.java:
##########
@@ -84,7 +84,7 @@ public Set<Integer> fetchPartitionIds(long timeoutMillis) {
     try {
       List<PartitionInfo> partitionInfos = _consumer.partitionsFor(_topic, 
Duration.ofMillis(timeoutMillis));
       if (CollectionUtils.isEmpty(partitionInfos)) {
-        throw new RuntimeException(String.format("Failed to fetch partition 
information for topic: %s", _topic));
+        throw transientPartitionInfoUnavailable();

Review Comment:
   Also reverted in kafka-4.0 provider to match kafka-3.0 and avoid changing 
caller retry semantics for non-transient empty-partition cases.



##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/pom.xml:
##########
@@ -46,6 +46,11 @@
       <artifactId>kafka-clients</artifactId>
       <version>${kafka3.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-server</artifactId>
+      <version>${kafka3.version}</version>
+    </dependency>

Review Comment:
   Not required. Removed in latest update (dropped 
org.apache.kafka:kafka-server from pinot-kafka-3.0).



##########
pinot-distribution/pinot-assembly.xml:
##########
@@ -48,10 +48,18 @@
     <!-- Start Include Pinot Stream Ingestion Plugins-->
     <file>
       <source>
-        
${pinot.root}/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/target/pinot-kafka-2.0-${project.version}-shaded.jar
+        
${pinot.root}/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/target/pinot-kafka-3.0-${project.version}-shaded.jar
       </source>
       <destName>
-        
plugins/pinot-stream-ingestion/pinot-kafka-2.0/pinot-kafka-2.0-${project.version}-shaded.jar
+        
plugins/pinot-stream-ingestion/pinot-kafka-3.0/pinot-kafka-3.0-${project.version}-shaded.jar
+      </destName>
+    </file>
+    <file>
+      <source>
+        
${pinot.root}/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/target/pinot-kafka-4.0-${project.version}-shaded.jar
+      </source>
+      <destName>
+        
plugins/pinot-stream-ingestion/pinot-kafka-4.0/pinot-kafka-4.0-${project.version}-shaded.jar

Review Comment:
   Good catch. Removed the kafka-4.0 shaded jar entry from pinot-assembly in 
latest update, so this PR scope stays focused on kafka-2.0 removal + kafka-3.0 
standardization.



##########
pinot-plugins/pinot-input-format/pinot-protobuf/pom.xml:
##########
@@ -53,6 +53,10 @@
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka-clients</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>

Review Comment:
   This is required because pinot-protobuf main sources directly use 
org.slf4j.Logger/LoggerFactory (ProtoBufUtils and MessageCodeGen). Without 
direct slf4j-api on compile classpath, pinot-protobuf fails to compile with 
'package org.slf4j does not exist'.



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentBuildTimeLeaseExtender.java:
##########
@@ -55,16 +57,31 @@ public class SegmentBuildTimeLeaseExtender {
   private final ServerSegmentCompletionProtocolHandler _protocolHandler;
 
   public static void initExecutor() {
-    _executor = new ScheduledThreadPoolExecutor(1);
-    LOGGER.info("Initialized segment build time lease extender executor");
+    synchronized (EXECUTOR_LOCK) {
+      _executorRefCount++;
+      if (_executor == null) {
+        _executor = new ScheduledThreadPoolExecutor(1);
+        LOGGER.info("Initialized segment build time lease extender executor");
+      }
+    }
   }
 
   public static void shutdownExecutor() {
-    if (_executor != null) {
-      _executor.shutdownNow();
-      _executor = null;
+    synchronized (EXECUTOR_LOCK) {
+      if (_executorRefCount > 0) {
+        _executorRefCount--;
+      }
+      if (_executorRefCount == 0 && _executor != null) {
+        _executor.shutdownNow();
+        _executor = null;
+        LOGGER.info("Shut down segment build time lease extender executor");
+      } else if (_executor != null) {
+        LOGGER.debug("Skipping lease extender executor shutdown because {} 
manager(s) are still active",
+            _executorRefCount);
+      } else {
+        LOGGER.debug("Lease extender executor is already shut down");
+      }

Review Comment:
   I validated this specifically by reverting it locally; that immediately 
reintroduced NPEs in PartialUpsertTableRebalanceIntegrationTest from 
SegmentBuildTimeLeaseExtender._executor being null (addSegment -> 
scheduleWithFixedDelay). The ref-counted lifecycle avoids premature executor 
shutdown while another realtime table/test still uses it.



##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/server/KafkaServerStartable.java:
##########
@@ -0,0 +1,701 @@
+/**

Review Comment:
   I evaluated moving this to pinot-tools. Current blocker is 
StreamDataProvider SPI loading: StartKafkaCommand and integration setup resolve 
startables from stream plugin modules. Moving this class to tools would break 
that discovery path unless we redesign the SPI/loader contract. Happy to split 
that redesign into a follow-up PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to