noob-se7en commented on code in PR #17602:
URL: https://github.com/apache/pinot/pull/17602#discussion_r2803011276
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentBuildTimeLeaseExtender.java:
##########
@@ -55,16 +57,31 @@ public class SegmentBuildTimeLeaseExtender {
private final ServerSegmentCompletionProtocolHandler _protocolHandler;
public static void initExecutor() {
- _executor = new ScheduledThreadPoolExecutor(1);
- LOGGER.info("Initialized segment build time lease extender executor");
+ synchronized (EXECUTOR_LOCK) {
+ _executorRefCount++;
+ if (_executor == null) {
+ _executor = new ScheduledThreadPoolExecutor(1);
+ LOGGER.info("Initialized segment build time lease extender executor");
+ }
+ }
}
public static void shutdownExecutor() {
- if (_executor != null) {
- _executor.shutdownNow();
- _executor = null;
+ synchronized (EXECUTOR_LOCK) {
+ if (_executorRefCount > 0) {
+ _executorRefCount--;
+ }
+ if (_executorRefCount == 0 && _executor != null) {
+ _executor.shutdownNow();
+ _executor = null;
+ LOGGER.info("Shut down segment build time lease extender executor");
+ } else if (_executor != null) {
+ LOGGER.debug("Skipping lease extender executor shutdown because {}
manager(s) are still active",
+ _executorRefCount);
+ } else {
+ LOGGER.debug("Lease extender executor is already shut down");
+ }
Review Comment:
Why are these changes required? Seems like unrelated change to this PR
##########
pinot-distribution/pinot-assembly.xml:
##########
@@ -48,10 +48,18 @@
<!-- Start Include Pinot Stream Ingestion Plugins-->
<file>
<source>
-
${pinot.root}/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/target/pinot-kafka-2.0-${project.version}-shaded.jar
+
${pinot.root}/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/target/pinot-kafka-3.0-${project.version}-shaded.jar
</source>
<destName>
-
plugins/pinot-stream-ingestion/pinot-kafka-2.0/pinot-kafka-2.0-${project.version}-shaded.jar
+
plugins/pinot-stream-ingestion/pinot-kafka-3.0/pinot-kafka-3.0-${project.version}-shaded.jar
+ </destName>
+ </file>
+ <file>
+ <source>
+
${pinot.root}/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/target/pinot-kafka-4.0-${project.version}-shaded.jar
+ </source>
+ <destName>
+
plugins/pinot-stream-ingestion/pinot-kafka-4.0/pinot-kafka-4.0-${project.version}-shaded.jar
Review Comment:
Is this intended to go in this PR?
##########
pinot-plugins/pinot-input-format/pinot-protobuf/pom.xml:
##########
@@ -53,6 +53,10 @@
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
Review Comment:
why is this required?
##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/pom.xml:
##########
@@ -46,6 +46,11 @@
<artifactId>kafka-clients</artifactId>
<version>${kafka3.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-server</artifactId>
+ <version>${kafka3.version}</version>
+ </dependency>
Review Comment:
this is required here?
##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java:
##########
@@ -84,7 +84,7 @@ public Set<Integer> fetchPartitionIds(long timeoutMillis) {
try {
List<PartitionInfo> partitionInfos = _consumer.partitionsFor(_topic,
Duration.ofMillis(timeoutMillis));
if (CollectionUtils.isEmpty(partitionInfos)) {
- throw new RuntimeException(String.format("Failed to fetch partition
information for topic: %s", _topic));
+ throw transientPartitionInfoUnavailable();
Review Comment:
This might cause a behaviour change since transientConsumer exception is
handled differently by callers.
If its not required then lets remove this
##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/server/KafkaServerStartable.java:
##########
@@ -0,0 +1,701 @@
+/**
Review Comment:
This is lot of production code just for quickstart convenience. Can this be
moved inside pinot-tools ?
##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/main/java/org/apache/pinot/plugin/stream/kafka40/KafkaStreamMetadataProvider.java:
##########
@@ -84,7 +84,7 @@ public Set<Integer> fetchPartitionIds(long timeoutMillis) {
try {
List<PartitionInfo> partitionInfos = _consumer.partitionsFor(_topic,
Duration.ofMillis(timeoutMillis));
if (CollectionUtils.isEmpty(partitionInfos)) {
- throw new RuntimeException(String.format("Failed to fetch partition
information for topic: %s", _topic));
+ throw transientPartitionInfoUnavailable();
Review Comment:
same comments if this can cause behaviour change
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]