This is an automated email from the ASF dual-hosted git repository.
yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new c377392562a Fix TODO in RealtimeTableDataManager.java using
BooleanSupplier (#17069)
c377392562a is described below
commit c377392562aec9ab081be49bc272be3e75764156
Author: Akanksha kedia <[email protected]>
AuthorDate: Fri Oct 24 22:08:30 2025 +0530
Fix TODO in RealtimeTableDataManager.java using BooleanSupplier (#17069)
---
.../pinot/core/data/manager/InstanceDataManager.java | 4 ++--
.../provider/DefaultTableDataManagerProvider.java | 4 ++--
.../data/manager/provider/TableDataManagerProvider.java | 4 ++--
.../data/manager/realtime/IngestionDelayTracker.java | 16 ++++++++--------
.../data/manager/realtime/RealtimeTableDataManager.java | 8 +++-----
.../data/manager/realtime/IngestionDelayTrackerTest.java | 4 ++--
.../utils/FailureInjectingRealtimeTableDataManager.java | 3 +--
.../utils/FailureInjectingTableDataManagerProvider.java | 4 ++--
.../server/starter/helix/HelixInstanceDataManager.java | 6 +++---
9 files changed, 25 insertions(+), 28 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
index ba5f93f5e6a..ed6e9bd402f 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
@@ -21,7 +21,7 @@ package org.apache.pinot.core.data.manager;
import java.io.File;
import java.util.List;
import java.util.Set;
-import java.util.function.Supplier;
+import java.util.function.BooleanSupplier;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.helix.HelixManager;
@@ -195,7 +195,7 @@ public interface InstanceDataManager {
*
* @param isServerReadyToServeQueries supplier to retrieve state of server.
*/
- void setSupplierOfIsServerReadyToServeQueries(Supplier<Boolean>
isServerReadyToServeQueries);
+ void setSupplierOfIsServerReadyToServeQueries(BooleanSupplier
isServerReadyToServeQueries);
/**
* Returns consumer directory paths on the instance
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java
index ba8ada89160..881b8b60bca 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java
@@ -22,7 +22,7 @@ import com.google.common.cache.Cache;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
-import java.util.function.Supplier;
+import java.util.function.BooleanSupplier;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
@@ -69,7 +69,7 @@ public class DefaultTableDataManagerProvider implements
TableDataManagerProvider
SegmentReloadSemaphore segmentReloadSemaphore, ExecutorService
segmentReloadRefreshExecutor,
@Nullable ExecutorService segmentPreloadExecutor,
@Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache,
- Supplier<Boolean> isServerReadyToServeQueries, boolean
enableAsyncSegmentRefresh) {
+ BooleanSupplier isServerReadyToServeQueries, boolean
enableAsyncSegmentRefresh) {
TableDataManager tableDataManager;
switch (tableConfig.getTableType()) {
case OFFLINE:
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java
index 1c558aaba6e..37bdc4627b4 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java
@@ -22,7 +22,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.Cache;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.function.Supplier;
+import java.util.function.BooleanSupplier;
import javax.annotation.Nullable;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.HelixManager;
@@ -50,7 +50,7 @@ public interface TableDataManagerProvider {
SegmentReloadSemaphore segmentRefreshSemaphore, ExecutorService
segmentReloadRefreshExecutor,
@Nullable ExecutorService segmentPreloadExecutor,
@Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache,
- Supplier<Boolean> isServerReadyToServeQueries, boolean
enableAsyncSegmentRefresh);
+ BooleanSupplier isServerReadyToServeQueries, boolean
enableAsyncSegmentRefresh);
@VisibleForTesting
default TableDataManager getTableDataManager(TableConfig tableConfig, Schema
schema) {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
index cbdda8b8903..541de387f5a 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
@@ -33,7 +33,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
+import java.util.function.BooleanSupplier;
import javax.annotation.Nullable;
import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.metrics.ServerMeter;
@@ -128,7 +128,7 @@ public class IngestionDelayTracker {
private final String _tableNameWithType;
private final String _metricName;
private final RealtimeTableDataManager _realTimeTableDataManager;
- private final Supplier<Boolean> _isServerReadyToServeQueries;
+ private final BooleanSupplier _isServerReadyToServeQueries;
private final Cache<String, Boolean> _segmentsToIgnore =
CacheBuilder.newBuilder().expireAfterAccess(IGNORED_SEGMENT_CACHE_TIME_MINUTES,
TimeUnit.MINUTES).build();
// Map to describe the partitions for which the metrics are being reported.
@@ -167,7 +167,7 @@ public class IngestionDelayTracker {
@VisibleForTesting
public IngestionDelayTracker(ServerMetrics serverMetrics, String
tableNameWithType,
RealtimeTableDataManager realtimeTableDataManager, long
metricsRemovalIntervalMs, long metricsTrackingIntervalMs,
- Supplier<Boolean> isServerReadyToServeQueries) {
+ BooleanSupplier isServerReadyToServeQueries) {
_serverMetrics = serverMetrics;
_tableNameWithType = tableNameWithType;
_metricName = tableNameWithType;
@@ -222,7 +222,7 @@ public class IngestionDelayTracker {
private void trackIngestionDelay() {
long startMs = System.currentTimeMillis();
try {
- if (!_isServerReadyToServeQueries.get() ||
_realTimeTableDataManager.isShutDown()) {
+ if (!_isServerReadyToServeQueries.getAsBoolean() ||
_realTimeTableDataManager.isShutDown()) {
// Do not update the ingestion delay metrics during server startup
period
// or once the table data manager has been shutdown.
return;
@@ -397,7 +397,7 @@ public class IngestionDelayTracker {
*/
public void updateMetrics(String segmentName, int partitionId, long
ingestionTimeMs,
@Nullable StreamPartitionMsgOffset currentOffset) {
- if (!_isServerReadyToServeQueries.get() ||
_realTimeTableDataManager.isShutDown()) {
+ if (!_isServerReadyToServeQueries.getAsBoolean() ||
_realTimeTableDataManager.isShutDown()) {
// Do not update the ingestion delay metrics during server startup period
// or once the table data manager has been shutdown.
return;
@@ -469,7 +469,7 @@ public class IngestionDelayTracker {
* This call is to be invoked by a scheduled executor thread that will
periodically wake up and invoke this function.
*/
public void timeoutInactivePartitions() {
- if (!_isServerReadyToServeQueries.get()) {
+ if (!_isServerReadyToServeQueries.getAsBoolean()) {
// Do not update the tracker state during server startup period
return;
}
@@ -502,7 +502,7 @@ public class IngestionDelayTracker {
* the segment is still hosted by this server after some interval of time.
*/
public void markPartitionForVerification(String segmentName) {
- if (!_isServerReadyToServeQueries.get() ||
_segmentsToIgnore.getIfPresent(segmentName) != null) {
+ if (!_isServerReadyToServeQueries.getAsBoolean() ||
_segmentsToIgnore.getIfPresent(segmentName) != null) {
// Do not update the tracker state during server startup period or if
the segment is marked to be ignored
return;
}
@@ -627,7 +627,7 @@ public class IngestionDelayTracker {
LOGGER.error("Failed to close streamMetadataProvider", e);
}
}
- if (!_isServerReadyToServeQueries.get()) {
+ if (!_isServerReadyToServeQueries.getAsBoolean()) {
// Do not update the tracker state during server startup period
return;
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index c8fdb110f20..ae406381070 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -36,7 +36,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.function.BooleanSupplier;
-import java.util.function.Supplier;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.collections4.CollectionUtils;
@@ -135,8 +134,7 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
@Deprecated
private static final String SEGMENT_DOWNLOAD_TIMEOUT_MINUTES =
"segmentDownloadTimeoutMinutes";
- // TODO: Change it to BooleanSupplier
- private final Supplier<Boolean> _isServerReadyToServeQueries;
+ private final BooleanSupplier _isServerReadyToServeQueries;
// Object to track ingestion delay for all partitions
private IngestionDelayTracker _ingestionDelayTracker;
@@ -150,7 +148,7 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
this(segmentBuildSemaphore, () -> true);
}
- public RealtimeTableDataManager(Semaphore segmentBuildSemaphore,
Supplier<Boolean> isServerReadyToServeQueries) {
+ public RealtimeTableDataManager(Semaphore segmentBuildSemaphore,
BooleanSupplier isServerReadyToServeQueries) {
_segmentBuildSemaphore = segmentBuildSemaphore;
_isServerReadyToServeQueries = isServerReadyToServeQueries;
}
@@ -918,7 +916,7 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
_enforceConsumptionInOrder = enforceConsumptionInOrder;
}
- public Supplier<Boolean> getIsServerReadyToServeQueries() {
+ public BooleanSupplier getIsServerReadyToServeQueries() {
return _isServerReadyToServeQueries;
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java
index 4356a857763..857a99d3764 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java
@@ -32,7 +32,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
+import java.util.function.BooleanSupplier;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.metrics.ServerMetrics;
@@ -96,7 +96,7 @@ public class IngestionDelayTrackerTest {
public MockIngestionDelayTracker(ServerMetrics serverMetrics, String
tableNameWithType,
RealtimeTableDataManager realtimeTableDataManager, int
timerThreadTickIntervalMs, int metricTrackingIntervalMs,
- Supplier<Boolean> isServerReadyToServeQueries) {
+ BooleanSupplier isServerReadyToServeQueries) {
super(serverMetrics, tableNameWithType, realtimeTableDataManager,
timerThreadTickIntervalMs,
metricTrackingIntervalMs, isServerReadyToServeQueries);
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeTableDataManager.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeTableDataManager.java
index b85c316e768..1eb5d402f5f 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeTableDataManager.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeTableDataManager.java
@@ -21,7 +21,6 @@ package org.apache.pinot.integration.tests.realtime.utils;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BooleanSupplier;
-import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.utils.LLCSegmentName;
@@ -46,7 +45,7 @@ public class FailureInjectingRealtimeTableDataManager extends
RealtimeTableDataM
}
public FailureInjectingRealtimeTableDataManager(Semaphore
segmentBuildSemaphore,
- Supplier<Boolean> isServerReadyToServeQueries,
+ BooleanSupplier isServerReadyToServeQueries,
@Nullable FailureInjectingTableConfig failureInjectingTableConfig) {
super(segmentBuildSemaphore, isServerReadyToServeQueries);
_failureInjectingTableConfig = failureInjectingTableConfig;
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java
index e382e1ca2dc..e0ea938438b 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java
@@ -24,7 +24,7 @@ import com.google.common.cache.Cache;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
-import java.util.function.Supplier;
+import java.util.function.BooleanSupplier;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
@@ -76,7 +76,7 @@ public class FailureInjectingTableDataManagerProvider
implements TableDataManage
SegmentReloadSemaphore segmentReloadSemaphore, ExecutorService
segmentReloadRefreshExecutor,
@Nullable ExecutorService segmentPreloadExecutor,
@Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache,
- Supplier<Boolean> isServerReadyToServeQueries, boolean
enableAsyncSegmentRefresh) {
+ BooleanSupplier isServerReadyToServeQueries, boolean
enableAsyncSegmentRefresh) {
TableDataManager tableDataManager;
switch (tableConfig.getTableType()) {
case OFFLINE:
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index d1e26a02de5..98f89e5a854 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -36,7 +36,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
-import java.util.function.Supplier;
+import java.util.function.BooleanSupplier;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.io.FileUtils;
@@ -99,7 +99,7 @@ public class HelixInstanceDataManager implements
InstanceDataManager {
private HelixManager _helixManager;
private ZkHelixPropertyStore<ZNRecord> _propertyStore;
private SegmentUploader _segmentUploader;
- private Supplier<Boolean> _isServerReadyToServeQueries = () -> false;
+ private BooleanSupplier _isServerReadyToServeQueries = () -> false;
// Fixed size LRU cache for storing last N errors on the instance.
// Key is TableNameWithType-SegmentName pair.
@@ -117,7 +117,7 @@ public class HelixInstanceDataManager implements
InstanceDataManager {
private ExecutorService _segmentPreloadExecutor;
@Override
- public void setSupplierOfIsServerReadyToServeQueries(Supplier<Boolean>
isServingQueries) {
+ public void setSupplierOfIsServerReadyToServeQueries(BooleanSupplier
isServingQueries) {
_isServerReadyToServeQueries = isServingQueries;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]