This is an automated email from the ASF dual-hosted git repository.
ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 24e01c1c1b Add Broker Node Level Config:
newSegmentExpirationTimeInSeconds (#13686)
24e01c1c1b is described below
commit 24e01c1c1b45efba1063b4bc718b2335ec1da6b8
Author: Pratik Tibrewal <[email protected]>
AuthorDate: Fri Jul 26 22:06:02 2024 +0530
Add Broker Node Level Config: newSegmentExpirationTimeInSeconds (#13686)
---
.../instanceselector/BalancedInstanceSelector.java | 5 +++--
.../instanceselector/BaseInstanceSelector.java | 9 ++++++---
.../routing/instanceselector/InstanceSelector.java | 6 +++++-
.../instanceselector/InstanceSelectorFactory.java | 11 +++++++----
.../MultiStageReplicaGroupSelector.java | 5 +++--
.../ReplicaGroupInstanceSelector.java | 5 +++--
.../StrictReplicaGroupInstanceSelector.java | 5 +++--
.../instanceselector/InstanceSelectorTest.java | 20 +++++++++++---------
.../org/apache/pinot/spi/utils/CommonConstants.java | 4 ++++
9 files changed, 45 insertions(+), 25 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java
index c827369907..9ffa8367b8 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java
@@ -50,8 +50,9 @@ public class BalancedInstanceSelector extends
BaseInstanceSelector {
public BalancedInstanceSelector(String tableNameWithType,
ZkHelixPropertyStore<ZNRecord> propertyStore,
BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector
adaptiveServerSelector, Clock clock,
- boolean useFixedReplica) {
- super(tableNameWithType, propertyStore, brokerMetrics,
adaptiveServerSelector, clock, useFixedReplica);
+ boolean useFixedReplica, long newSegmentExpirationTimeInSeconds) {
+ super(tableNameWithType, propertyStore, brokerMetrics,
adaptiveServerSelector, clock, useFixedReplica,
+ newSegmentExpirationTimeInSeconds);
}
@Override
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
index 3cef77fac4..4849a60fa0 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
@@ -90,6 +90,7 @@ abstract class BaseInstanceSelector implements
InstanceSelector {
final AdaptiveServerSelector _adaptiveServerSelector;
final Clock _clock;
final boolean _useFixedReplica;
+ final long _newSegmentExpirationTimeInSeconds;
final int _tableNameHashForFixedReplicaRouting;
// These 3 variables are the cached states to help accelerate the change
processing
@@ -104,13 +105,14 @@ abstract class BaseInstanceSelector implements
InstanceSelector {
BaseInstanceSelector(String tableNameWithType,
ZkHelixPropertyStore<ZNRecord> propertyStore,
BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector
adaptiveServerSelector, Clock clock,
- boolean useFixedReplica) {
+ boolean useFixedReplica, long newSegmentExpirationTimeInSeconds) {
_tableNameWithType = tableNameWithType;
_propertyStore = propertyStore;
_brokerMetrics = brokerMetrics;
_adaptiveServerSelector = adaptiveServerSelector;
_clock = clock;
_useFixedReplica = useFixedReplica;
+ _newSegmentExpirationTimeInSeconds = newSegmentExpirationTimeInSeconds;
// Using raw table name to ensure queries spanning across REALTIME and
OFFLINE tables are routed to the same
// instance
// Math.abs(Integer.MIN_VALUE) = Integer.MIN_VALUE, so we use & 0x7FFFFFFF
to get a positive value
@@ -170,7 +172,7 @@ abstract class BaseInstanceSelector implements
InstanceSelector {
}
SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(record);
long creationTimeMs =
SegmentUtils.getSegmentCreationTimeMs(segmentZKMetadata);
- if (InstanceSelector.isNewSegment(creationTimeMs, currentTimeMs)) {
+ if (InstanceSelector.isNewSegment(creationTimeMs, currentTimeMs,
_newSegmentExpirationTimeInSeconds * 1000)) {
newSegmentCreationTimeMap.put(segmentZKMetadata.getSegmentName(),
creationTimeMs);
}
}
@@ -400,7 +402,8 @@ abstract class BaseInstanceSelector implements
InstanceSelector {
long creationTimeMs = 0;
if (newSegmentState != null) {
// It was a new segment before, check the creation time and segment
state to see if it is still a new segment
- if (InstanceSelector.isNewSegment(newSegmentState.getCreationTimeMs(),
currentTimeMs)) {
+ if (InstanceSelector.isNewSegment(newSegmentState.getCreationTimeMs(),
currentTimeMs,
+ _newSegmentExpirationTimeInSeconds * 1000)) {
creationTimeMs = newSegmentState.getCreationTimeMs();
}
} else if (!_oldSegmentCandidatesMap.containsKey(segment)) {
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelector.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelector.java
index d003723c5b..b5fe944bce 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelector.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelector.java
@@ -36,7 +36,11 @@ public interface InstanceSelector {
long NEW_SEGMENT_EXPIRATION_MILLIS = TimeUnit.MINUTES.toMillis(5);
static boolean isNewSegment(long creationTimeMs, long currentTimeMs) {
- return creationTimeMs > 0 && currentTimeMs - creationTimeMs <=
NEW_SEGMENT_EXPIRATION_MILLIS;
+ return isNewSegment(creationTimeMs, currentTimeMs,
NEW_SEGMENT_EXPIRATION_MILLIS);
+ }
+
+ static boolean isNewSegment(long creationTimeMs, long currentTimeMs, long
newSegmentExpirationMillis) {
+ return creationTimeMs > 0 && currentTimeMs - creationTimeMs <=
newSegmentExpirationMillis;
}
/**
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorFactory.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorFactory.java
index 2ccb1a9ac1..895731f1dd 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorFactory.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorFactory.java
@@ -63,6 +63,9 @@ public class InstanceSelectorFactory {
RoutingConfig routingConfig = tableConfig.getRoutingConfig();
boolean useFixedReplica =
brokerConfig.getProperty(CommonConstants.Broker.CONFIG_OF_USE_FIXED_REPLICA,
CommonConstants.Broker.DEFAULT_USE_FIXED_REPLICA);
+ long newSegmentExpirationTimeInSeconds =
+
brokerConfig.getProperty(CommonConstants.Broker.CONFIG_OF_NEW_SEGMENT_EXPIRATION_SECONDS,
+
CommonConstants.Broker.DEFAULT_VALUE_OF_NEW_SEGMENT_EXPIRATION_SECONDS);
if (routingConfig != null) {
if (routingConfig.getUseFixedReplica() != null) {
// table config overrides broker config
@@ -74,22 +77,22 @@ public class InstanceSelectorFactory {
&&
LEGACY_REPLICA_GROUP_REALTIME_ROUTING.equalsIgnoreCase(routingConfig.getRoutingTableBuilderName())))
{
LOGGER.info("Using ReplicaGroupInstanceSelector for table: {}",
tableNameWithType);
return new ReplicaGroupInstanceSelector(tableNameWithType,
propertyStore, brokerMetrics, adaptiveServerSelector,
- clock, useFixedReplica);
+ clock, useFixedReplica, newSegmentExpirationTimeInSeconds);
}
if
(RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE.equalsIgnoreCase(
routingConfig.getInstanceSelectorType())) {
LOGGER.info("Using StrictReplicaGroupInstanceSelector for table: {}",
tableNameWithType);
return new StrictReplicaGroupInstanceSelector(tableNameWithType,
propertyStore, brokerMetrics,
- adaptiveServerSelector, clock, useFixedReplica);
+ adaptiveServerSelector, clock, useFixedReplica,
newSegmentExpirationTimeInSeconds);
}
if
(RoutingConfig.MULTI_STAGE_REPLICA_GROUP_SELECTOR_TYPE.equalsIgnoreCase(
routingConfig.getInstanceSelectorType())) {
LOGGER.info("Using {} for table: {}",
routingConfig.getInstanceSelectorType(), tableNameWithType);
return new MultiStageReplicaGroupSelector(tableNameWithType,
propertyStore, brokerMetrics,
- adaptiveServerSelector, clock, useFixedReplica);
+ adaptiveServerSelector, clock, useFixedReplica,
newSegmentExpirationTimeInSeconds);
}
}
return new BalancedInstanceSelector(tableNameWithType, propertyStore,
brokerMetrics, adaptiveServerSelector, clock,
- useFixedReplica);
+ useFixedReplica, newSegmentExpirationTimeInSeconds);
}
}
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java
index b27450426d..22dfc8096a 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java
@@ -59,8 +59,9 @@ public class MultiStageReplicaGroupSelector extends
BaseInstanceSelector {
public MultiStageReplicaGroupSelector(String tableNameWithType,
ZkHelixPropertyStore<ZNRecord> propertyStore,
BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector
adaptiveServerSelector, Clock clock,
- boolean useFixedReplica) {
- super(tableNameWithType, propertyStore, brokerMetrics,
adaptiveServerSelector, clock, useFixedReplica);
+ boolean useFixedReplica, long newSegmentExpirationTimeInSeconds) {
+ super(tableNameWithType, propertyStore, brokerMetrics,
adaptiveServerSelector, clock, useFixedReplica,
+ newSegmentExpirationTimeInSeconds);
}
@Override
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
index 0e9bd52d42..c766791f2d 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
@@ -65,8 +65,9 @@ public class ReplicaGroupInstanceSelector extends
BaseInstanceSelector {
public ReplicaGroupInstanceSelector(String tableNameWithType,
ZkHelixPropertyStore<ZNRecord> propertyStore,
BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector
adaptiveServerSelector, Clock clock,
- boolean useFixedReplica) {
- super(tableNameWithType, propertyStore, brokerMetrics,
adaptiveServerSelector, clock, useFixedReplica);
+ boolean useFixedReplica, long newSegmentExpirationTimeInSeconds) {
+ super(tableNameWithType, propertyStore, brokerMetrics,
adaptiveServerSelector, clock, useFixedReplica,
+ newSegmentExpirationTimeInSeconds);
}
@Override
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java
index 95e83ea31c..a8fae76ef1 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java
@@ -72,8 +72,9 @@ public class StrictReplicaGroupInstanceSelector extends
ReplicaGroupInstanceSele
public StrictReplicaGroupInstanceSelector(String tableNameWithType,
ZkHelixPropertyStore<ZNRecord> propertyStore,
BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector
adaptiveServerSelector, Clock clock,
- boolean useFixedReplica) {
- super(tableNameWithType, propertyStore, brokerMetrics,
adaptiveServerSelector, clock, useFixedReplica);
+ boolean useFixedReplica, long newSegmentExpirationTimeInSeconds) {
+ super(tableNameWithType, propertyStore, brokerMetrics,
adaptiveServerSelector, clock, useFixedReplica,
+ newSegmentExpirationTimeInSeconds);
}
/**
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
index f4edc28e67..cc4f355369 100644
---
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
+++
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
@@ -234,13 +234,14 @@ public class InstanceSelectorTest {
ZkHelixPropertyStore<ZNRecord> propertyStore =
mock(ZkHelixPropertyStore.class);
BrokerMetrics brokerMetrics = mock(BrokerMetrics.class);
BalancedInstanceSelector balancedInstanceSelector =
- new BalancedInstanceSelector(offlineTableName, propertyStore,
brokerMetrics, null, Clock.systemUTC(), false);
+ new BalancedInstanceSelector(offlineTableName, propertyStore,
brokerMetrics, null, Clock.systemUTC(),
+ false, 300);
ReplicaGroupInstanceSelector replicaGroupInstanceSelector =
new ReplicaGroupInstanceSelector(offlineTableName, propertyStore,
brokerMetrics, null, Clock.systemUTC(),
- false);
+ false, 300);
StrictReplicaGroupInstanceSelector strictReplicaGroupInstanceSelector =
new StrictReplicaGroupInstanceSelector(offlineTableName,
propertyStore, brokerMetrics, null, Clock.systemUTC(),
- false);
+ false, 300);
Set<String> enabledInstances = new HashSet<>();
IdealState idealState = new IdealState(offlineTableName);
@@ -761,7 +762,7 @@ public class InstanceSelectorTest {
ReplicaGroupInstanceSelector replicaGroupInstanceSelector =
new ReplicaGroupInstanceSelector(offlineTableName, propertyStore,
brokerMetrics, null, Clock.systemUTC(),
- false);
+ false, 300);
Set<String> enabledInstances = new HashSet<>();
IdealState idealState = new IdealState(offlineTableName);
@@ -844,7 +845,7 @@ public class InstanceSelectorTest {
ReplicaGroupInstanceSelector replicaGroupInstanceSelector =
new ReplicaGroupInstanceSelector(offlineTableName, propertyStore,
brokerMetrics, null, Clock.systemUTC(),
- false);
+ false, 300);
Set<String> enabledInstances = new HashSet<>();
IdealState idealState = new IdealState(offlineTableName);
@@ -927,7 +928,7 @@ public class InstanceSelectorTest {
ReplicaGroupInstanceSelector replicaGroupInstanceSelector =
new ReplicaGroupInstanceSelector(offlineTableName, propertyStore,
brokerMetrics, null, Clock.systemUTC(),
- false);
+ false, 300);
Set<String> enabledInstances = new HashSet<>();
IdealState idealState = new IdealState(offlineTableName);
@@ -1001,7 +1002,7 @@ public class InstanceSelectorTest {
MultiStageReplicaGroupSelector multiStageSelector =
new MultiStageReplicaGroupSelector(offlineTableName, propertyStore,
brokerMetrics, null, Clock.systemUTC(),
- false);
+ false, 300);
multiStageSelector = spy(multiStageSelector);
doReturn(instancePartitions).when(multiStageSelector).getInstancePartitions();
@@ -1096,11 +1097,12 @@ public class InstanceSelectorTest {
ZkHelixPropertyStore<ZNRecord> propertyStore =
mock(ZkHelixPropertyStore.class);
BrokerMetrics brokerMetrics = mock(BrokerMetrics.class);
BalancedInstanceSelector balancedInstanceSelector =
- new BalancedInstanceSelector(offlineTableName, propertyStore,
brokerMetrics, null, Clock.systemUTC(), false);
+ new BalancedInstanceSelector(offlineTableName, propertyStore,
brokerMetrics, null, Clock.systemUTC(),
+ false, 300);
// ReplicaGroupInstanceSelector has the same behavior as
BalancedInstanceSelector for the unavailable segments
StrictReplicaGroupInstanceSelector strictReplicaGroupInstanceSelector =
new StrictReplicaGroupInstanceSelector(offlineTableName,
propertyStore, brokerMetrics, null, Clock.systemUTC(),
- false);
+ false, 300);
Set<String> enabledInstances = new HashSet<>();
IdealState idealState = new IdealState(offlineTableName);
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 3753c0e64b..b6fec05b31 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableList;
import java.io.File;
import java.math.BigDecimal;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import org.apache.pinot.spi.config.instance.InstanceType;
@@ -344,6 +345,9 @@ public class CommonConstants {
// precedence over "query.response.size" (i.e., "query.response.size" will
be ignored).
public static final String CONFIG_OF_MAX_SERVER_RESPONSE_SIZE_BYTES =
"pinot.broker.max.server.response.size.bytes";
+ public static final String CONFIG_OF_NEW_SEGMENT_EXPIRATION_SECONDS =
"pinot.broker.new.segment.expiration.seconds";
+ public static final long DEFAULT_VALUE_OF_NEW_SEGMENT_EXPIRATION_SECONDS =
TimeUnit.MINUTES.toSeconds(5);
+
public static class Request {
public static final String SQL = "sql";
public static final String TRACE = "trace";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]