This is an automated email from the ASF dual-hosted git repository.

yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new c377392562a Fix TODO in RealtimeTableDataManager.java using 
BooleanSupplier (#17069)
c377392562a is described below

commit c377392562aec9ab081be49bc272be3e75764156
Author: Akanksha kedia <[email protected]>
AuthorDate: Fri Oct 24 22:08:30 2025 +0530

    Fix TODO in RealtimeTableDataManager.java using BooleanSupplier (#17069)
---
 .../pinot/core/data/manager/InstanceDataManager.java     |  4 ++--
 .../provider/DefaultTableDataManagerProvider.java        |  4 ++--
 .../data/manager/provider/TableDataManagerProvider.java  |  4 ++--
 .../data/manager/realtime/IngestionDelayTracker.java     | 16 ++++++++--------
 .../data/manager/realtime/RealtimeTableDataManager.java  |  8 +++-----
 .../data/manager/realtime/IngestionDelayTrackerTest.java |  4 ++--
 .../utils/FailureInjectingRealtimeTableDataManager.java  |  3 +--
 .../utils/FailureInjectingTableDataManagerProvider.java  |  4 ++--
 .../server/starter/helix/HelixInstanceDataManager.java   |  6 +++---
 9 files changed, 25 insertions(+), 28 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
index ba5f93f5e6a..ed6e9bd402f 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
@@ -21,7 +21,7 @@ package org.apache.pinot.core.data.manager;
 import java.io.File;
 import java.util.List;
 import java.util.Set;
-import java.util.function.Supplier;
+import java.util.function.BooleanSupplier;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.helix.HelixManager;
@@ -195,7 +195,7 @@ public interface InstanceDataManager {
    *
    * @param isServerReadyToServeQueries supplier to retrieve state of server.
    */
-  void setSupplierOfIsServerReadyToServeQueries(Supplier<Boolean> 
isServerReadyToServeQueries);
+  void setSupplierOfIsServerReadyToServeQueries(BooleanSupplier 
isServerReadyToServeQueries);
 
   /**
    * Returns consumer directory paths on the instance
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java
index ba8ada89160..881b8b60bca 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java
@@ -22,7 +22,7 @@ import com.google.common.cache.Cache;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Semaphore;
-import java.util.function.Supplier;
+import java.util.function.BooleanSupplier;
 import javax.annotation.Nullable;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
@@ -69,7 +69,7 @@ public class DefaultTableDataManagerProvider implements 
TableDataManagerProvider
       SegmentReloadSemaphore segmentReloadSemaphore, ExecutorService 
segmentReloadRefreshExecutor,
       @Nullable ExecutorService segmentPreloadExecutor,
       @Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache,
-      Supplier<Boolean> isServerReadyToServeQueries, boolean 
enableAsyncSegmentRefresh) {
+      BooleanSupplier isServerReadyToServeQueries, boolean 
enableAsyncSegmentRefresh) {
     TableDataManager tableDataManager;
     switch (tableConfig.getTableType()) {
       case OFFLINE:
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java
index 1c558aaba6e..37bdc4627b4 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java
@@ -22,7 +22,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.cache.Cache;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.function.Supplier;
+import java.util.function.BooleanSupplier;
 import javax.annotation.Nullable;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.helix.HelixManager;
@@ -50,7 +50,7 @@ public interface TableDataManagerProvider {
       SegmentReloadSemaphore segmentRefreshSemaphore, ExecutorService 
segmentReloadRefreshExecutor,
       @Nullable ExecutorService segmentPreloadExecutor,
       @Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache,
-      Supplier<Boolean> isServerReadyToServeQueries, boolean 
enableAsyncSegmentRefresh);
+      BooleanSupplier isServerReadyToServeQueries, boolean 
enableAsyncSegmentRefresh);
 
   @VisibleForTesting
   default TableDataManager getTableDataManager(TableConfig tableConfig, Schema 
schema) {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
index cbdda8b8903..541de387f5a 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
@@ -33,7 +33,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
+import java.util.function.BooleanSupplier;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.metrics.ServerGauge;
 import org.apache.pinot.common.metrics.ServerMeter;
@@ -128,7 +128,7 @@ public class IngestionDelayTracker {
   private final String _tableNameWithType;
   private final String _metricName;
   private final RealtimeTableDataManager _realTimeTableDataManager;
-  private final Supplier<Boolean> _isServerReadyToServeQueries;
+  private final BooleanSupplier _isServerReadyToServeQueries;
   private final Cache<String, Boolean> _segmentsToIgnore =
       
CacheBuilder.newBuilder().expireAfterAccess(IGNORED_SEGMENT_CACHE_TIME_MINUTES, 
TimeUnit.MINUTES).build();
   // Map to describe the partitions for which the metrics are being reported.
@@ -167,7 +167,7 @@ public class IngestionDelayTracker {
   @VisibleForTesting
   public IngestionDelayTracker(ServerMetrics serverMetrics, String 
tableNameWithType,
       RealtimeTableDataManager realtimeTableDataManager, long 
metricsRemovalIntervalMs, long metricsTrackingIntervalMs,
-      Supplier<Boolean> isServerReadyToServeQueries) {
+      BooleanSupplier isServerReadyToServeQueries) {
     _serverMetrics = serverMetrics;
     _tableNameWithType = tableNameWithType;
     _metricName = tableNameWithType;
@@ -222,7 +222,7 @@ public class IngestionDelayTracker {
   private void trackIngestionDelay() {
     long startMs = System.currentTimeMillis();
     try {
-      if (!_isServerReadyToServeQueries.get() || 
_realTimeTableDataManager.isShutDown()) {
+      if (!_isServerReadyToServeQueries.getAsBoolean() || 
_realTimeTableDataManager.isShutDown()) {
         // Do not update the ingestion delay metrics during server startup 
period
         // or once the table data manager has been shutdown.
         return;
@@ -397,7 +397,7 @@ public class IngestionDelayTracker {
    */
   public void updateMetrics(String segmentName, int partitionId, long 
ingestionTimeMs,
       @Nullable StreamPartitionMsgOffset currentOffset) {
-    if (!_isServerReadyToServeQueries.get() || 
_realTimeTableDataManager.isShutDown()) {
+    if (!_isServerReadyToServeQueries.getAsBoolean() || 
_realTimeTableDataManager.isShutDown()) {
       // Do not update the ingestion delay metrics during server startup period
       // or once the table data manager has been shutdown.
       return;
@@ -469,7 +469,7 @@ public class IngestionDelayTracker {
    * This call is to be invoked by a scheduled executor thread that will 
periodically wake up and invoke this function.
    */
   public void timeoutInactivePartitions() {
-    if (!_isServerReadyToServeQueries.get()) {
+    if (!_isServerReadyToServeQueries.getAsBoolean()) {
       // Do not update the tracker state during server startup period
       return;
     }
@@ -502,7 +502,7 @@ public class IngestionDelayTracker {
    * the segment is still hosted by this server after some interval of time.
    */
   public void markPartitionForVerification(String segmentName) {
-    if (!_isServerReadyToServeQueries.get() || 
_segmentsToIgnore.getIfPresent(segmentName) != null) {
+    if (!_isServerReadyToServeQueries.getAsBoolean() || 
_segmentsToIgnore.getIfPresent(segmentName) != null) {
       // Do not update the tracker state during server startup period or if 
the segment is marked to be ignored
       return;
     }
@@ -627,7 +627,7 @@ public class IngestionDelayTracker {
         LOGGER.error("Failed to close streamMetadataProvider", e);
       }
     }
-    if (!_isServerReadyToServeQueries.get()) {
+    if (!_isServerReadyToServeQueries.getAsBoolean()) {
       // Do not update the tracker state during server startup period
       return;
     }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index c8fdb110f20..ae406381070 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -36,7 +36,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.Lock;
 import java.util.function.BooleanSupplier;
-import java.util.function.Supplier;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.commons.collections4.CollectionUtils;
@@ -135,8 +134,7 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
   @Deprecated
   private static final String SEGMENT_DOWNLOAD_TIMEOUT_MINUTES = 
"segmentDownloadTimeoutMinutes";
 
-  // TODO: Change it to BooleanSupplier
-  private final Supplier<Boolean> _isServerReadyToServeQueries;
+  private final BooleanSupplier _isServerReadyToServeQueries;
 
   // Object to track ingestion delay for all partitions
   private IngestionDelayTracker _ingestionDelayTracker;
@@ -150,7 +148,7 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
     this(segmentBuildSemaphore, () -> true);
   }
 
-  public RealtimeTableDataManager(Semaphore segmentBuildSemaphore, 
Supplier<Boolean> isServerReadyToServeQueries) {
+  public RealtimeTableDataManager(Semaphore segmentBuildSemaphore, 
BooleanSupplier isServerReadyToServeQueries) {
     _segmentBuildSemaphore = segmentBuildSemaphore;
     _isServerReadyToServeQueries = isServerReadyToServeQueries;
   }
@@ -918,7 +916,7 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
     _enforceConsumptionInOrder = enforceConsumptionInOrder;
   }
 
-  public Supplier<Boolean> getIsServerReadyToServeQueries() {
+  public BooleanSupplier getIsServerReadyToServeQueries() {
     return _isServerReadyToServeQueries;
   }
 
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java
index 4356a857763..857a99d3764 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java
@@ -32,7 +32,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
+import java.util.function.BooleanSupplier;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.common.metrics.ServerGauge;
 import org.apache.pinot.common.metrics.ServerMetrics;
@@ -96,7 +96,7 @@ public class IngestionDelayTrackerTest {
 
     public MockIngestionDelayTracker(ServerMetrics serverMetrics, String 
tableNameWithType,
         RealtimeTableDataManager realtimeTableDataManager, int 
timerThreadTickIntervalMs, int metricTrackingIntervalMs,
-        Supplier<Boolean> isServerReadyToServeQueries) {
+        BooleanSupplier isServerReadyToServeQueries) {
       super(serverMetrics, tableNameWithType, realtimeTableDataManager, 
timerThreadTickIntervalMs,
           metricTrackingIntervalMs, isServerReadyToServeQueries);
     }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeTableDataManager.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeTableDataManager.java
index b85c316e768..1eb5d402f5f 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeTableDataManager.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeTableDataManager.java
@@ -21,7 +21,6 @@ package org.apache.pinot.integration.tests.realtime.utils;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BooleanSupplier;
-import java.util.function.Supplier;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.utils.LLCSegmentName;
@@ -46,7 +45,7 @@ public class FailureInjectingRealtimeTableDataManager extends 
RealtimeTableDataM
   }
 
   public FailureInjectingRealtimeTableDataManager(Semaphore 
segmentBuildSemaphore,
-      Supplier<Boolean> isServerReadyToServeQueries,
+      BooleanSupplier isServerReadyToServeQueries,
       @Nullable FailureInjectingTableConfig failureInjectingTableConfig) {
     super(segmentBuildSemaphore, isServerReadyToServeQueries);
     _failureInjectingTableConfig = failureInjectingTableConfig;
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java
index e382e1ca2dc..e0ea938438b 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java
@@ -24,7 +24,7 @@ import com.google.common.cache.Cache;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Semaphore;
-import java.util.function.Supplier;
+import java.util.function.BooleanSupplier;
 import javax.annotation.Nullable;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
@@ -76,7 +76,7 @@ public class FailureInjectingTableDataManagerProvider 
implements TableDataManage
       SegmentReloadSemaphore segmentReloadSemaphore, ExecutorService 
segmentReloadRefreshExecutor,
       @Nullable ExecutorService segmentPreloadExecutor,
       @Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache,
-      Supplier<Boolean> isServerReadyToServeQueries, boolean 
enableAsyncSegmentRefresh) {
+      BooleanSupplier isServerReadyToServeQueries, boolean 
enableAsyncSegmentRefresh) {
     TableDataManager tableDataManager;
     switch (tableConfig.getTableType()) {
       case OFFLINE:
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index d1e26a02de5..98f89e5a854 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -36,7 +36,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
-import java.util.function.Supplier;
+import java.util.function.BooleanSupplier;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.commons.io.FileUtils;
@@ -99,7 +99,7 @@ public class HelixInstanceDataManager implements 
InstanceDataManager {
   private HelixManager _helixManager;
   private ZkHelixPropertyStore<ZNRecord> _propertyStore;
   private SegmentUploader _segmentUploader;
-  private Supplier<Boolean> _isServerReadyToServeQueries = () -> false;
+  private BooleanSupplier _isServerReadyToServeQueries = () -> false;
 
   // Fixed size LRU cache for storing last N errors on the instance.
   // Key is TableNameWithType-SegmentName pair.
@@ -117,7 +117,7 @@ public class HelixInstanceDataManager implements 
InstanceDataManager {
   private ExecutorService _segmentPreloadExecutor;
 
   @Override
-  public void setSupplierOfIsServerReadyToServeQueries(Supplier<Boolean> 
isServingQueries) {
+  public void setSupplierOfIsServerReadyToServeQueries(BooleanSupplier 
isServingQueries) {
     _isServerReadyToServeQueries = isServingQueries;
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to