This is an automated email from the ASF dual-hosted git repository.
mcvsubbu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 6611ff8 Add config for enabling realtime offset based consumption
status checker (#7753)
6611ff8 is described below
commit 6611ff82f46ea837e6bd8edefd46ccc16d4e2002
Author: Sajjad Moradi <[email protected]>
AuthorDate: Mon Nov 15 12:05:32 2021 -0800
Add config for enabling realtime offset based consumption status checker
(#7753)
---
.../apache/pinot/common/utils/ServiceStatus.java | 21 ++++++++-------------
.../server/starter/helix/BaseServerStarter.java | 16 ++++++++++++----
.../org/apache/pinot/spi/utils/CommonConstants.java | 3 +++
3 files changed, 23 insertions(+), 17 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStatus.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStatus.java
index d50e399..45221fb 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStatus.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStatus.java
@@ -218,8 +218,6 @@ public class ServiceStatus {
private final Supplier<Integer>
_getNumConsumingSegmentsNotReachedTheirLatestOffset;
String _statusDescription = STATUS_DESCRIPTION_INIT;
- private boolean _consumptionNotYetCaughtUp = true;
-
/**
* Realtime consumption catchup service which adds a static wait time for
consuming segments to catchup
*/
@@ -242,22 +240,19 @@ public class ServiceStatus {
return _serviceStatus;
}
long now = System.currentTimeMillis();
- int numConsumingSegmentsNotCaughtUp =
_getNumConsumingSegmentsNotReachedTheirLatestOffset.get();
+ boolean isConsumingSegmentsCounterProvided =
_getNumConsumingSegmentsNotReachedTheirLatestOffset != null;
+ int numConsumingSegmentsNotCaughtUp =
+ isConsumingSegmentsCounterProvided ?
_getNumConsumingSegmentsNotReachedTheirLatestOffset.get() : -1;
if (now >= _endWaitTime) {
_statusDescription = String.format("Consuming segments status GOOD
since %dms "
+ "(numConsumingSegmentsNotCaughtUp=%d)", _endWaitTime,
numConsumingSegmentsNotCaughtUp);
return Status.GOOD;
}
- if (_consumptionNotYetCaughtUp && numConsumingSegmentsNotCaughtUp > 0) {
- // TODO: Once the performance of offset based consumption checker is
validated:
- // - remove the log line
- // - uncomment the status & statusDescription lines
- // - remove variable _consumptionNotYetCaughtUp
- _consumptionNotYetCaughtUp = false;
- LOGGER.info("All consuming segments have reached their latest offsets!
"
- + "Finished {} msec earlier than time threshold.", _endWaitTime -
now);
-// _statusDescription = "Consuming segments status GOOD as all consuming
segments have reached the latest offset";
-// return Status.GOOD;
+ if (isConsumingSegmentsCounterProvided &&
numConsumingSegmentsNotCaughtUp == 0) {
+ _statusDescription = String.format(
+ "Consuming segments status GOOD as all consuming segments have
reached the latest offset. "
+ + "Finished %d msec earlier than time threshold.",
_endWaitTime - now);
+ return Status.GOOD;
}
_statusDescription =
String.format("Waiting for consuming segments to catchup:
numConsumingSegmentsNotCaughtUp=%d, "
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index fe010ba..aaa1e2a 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -29,6 +29,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.helix.HelixAdmin;
@@ -221,6 +222,9 @@ public abstract class BaseServerStarter implements
ServiceStartable {
int realtimeConsumptionCatchupWaitMs = _serverConf
.getProperty(Server.CONFIG_OF_STARTUP_REALTIME_CONSUMPTION_CATCHUP_WAIT_MS,
Server.DEFAULT_STARTUP_REALTIME_CONSUMPTION_CATCHUP_WAIT_MS);
+ boolean isOffsetBasedConsumptionStatusCheckerEnabled = _serverConf
+
.getProperty(Server.CONFIG_OF_ENABLE_REALTIME_OFFSET_BASED_CONSUMPTION_STATUS_CHECKER,
+
Server.DEFAULT_ENABLE_REALTIME_OFFSET_BASED_CONSUMPTION_STATUS_CHECKER);
// collect all resources which have this instance in the ideal state
List<String> resourcesToMonitor = new ArrayList<>();
@@ -265,12 +269,16 @@ public abstract class BaseServerStarter implements
ServiceStartable {
_instanceId, resourcesToMonitor, minResourcePercentForStartup));
boolean foundConsuming = !consumingSegments.isEmpty();
if (checkRealtime && foundConsuming) {
- OffsetBasedConsumptionStatusChecker consumptionStatusChecker =
- new
OffsetBasedConsumptionStatusChecker(_serverInstance.getInstanceDataManager(),
consumingSegments);
+ Supplier<Integer> getNumConsumingSegmentsNotReachedTheirLatestOffset =
null;
+ if (isOffsetBasedConsumptionStatusCheckerEnabled) {
+ OffsetBasedConsumptionStatusChecker consumptionStatusChecker =
+ new
OffsetBasedConsumptionStatusChecker(_serverInstance.getInstanceDataManager(),
consumingSegments);
+ getNumConsumingSegmentsNotReachedTheirLatestOffset =
+
consumptionStatusChecker::getNumConsumingSegmentsNotReachedTheirLatestOffset;
+ }
serviceStatusCallbackListBuilder.add(
new
ServiceStatus.RealtimeConsumptionCatchupServiceStatusCallback(_helixManager,
_helixClusterName,
- _instanceId, realtimeConsumptionCatchupWaitMs,
-
consumptionStatusChecker::getNumConsumingSegmentsNotReachedTheirLatestOffset));
+ _instanceId, realtimeConsumptionCatchupWaitMs,
getNumConsumingSegmentsNotReachedTheirLatestOffset));
}
LOGGER.info("Registering service status handler");
ServiceStatus.setServiceStatusCallback(_instanceId,
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 1402417..58c175e 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -296,6 +296,9 @@ public class CommonConstants {
public static final String
CONFIG_OF_STARTUP_REALTIME_CONSUMPTION_CATCHUP_WAIT_MS =
"pinot.server.starter.realtimeConsumptionCatchupWaitMs";
public static final int
DEFAULT_STARTUP_REALTIME_CONSUMPTION_CATCHUP_WAIT_MS = 0;
+ public static final String
CONFIG_OF_ENABLE_REALTIME_OFFSET_BASED_CONSUMPTION_STATUS_CHECKER =
+
"pinot.server.starter.enableRealtimeOffsetBasedConsumptionStatusChecker";
+ public static final boolean
DEFAULT_ENABLE_REALTIME_OFFSET_BASED_CONSUMPTION_STATUS_CHECKER = false;
public static final String DEFAULT_READ_MODE = "mmap";
// Whether to reload consuming segment on scheme update
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]