This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 555e5a0443 Combine the read access for replication config (#9849)
555e5a0443 is described below
commit 555e5a04439c614bea8915c9e69327d7390260de
Author: Seunghyun Lee <[email protected]>
AuthorDate: Wed Nov 23 22:38:10 2022 -0800
Combine the read access for replication config (#9849)
* Combine the read access for replication config
Currently, we have a separate configuration for replication.
Offline and HLC reads from `replication` and LLC reads from
`replicasPerPartition`. This PR combines the read access for
the replication config.
* Addressed comments
---
.../assignment/InstanceAssignmentConfigUtils.java | 2 +-
.../pinot/common/utils/config/TableConfigTest.java | 68 +++++++++++++++++++---
.../api/resources/PinotTableRestletResource.java | 5 +-
.../controller/helix/SegmentStatusChecker.java | 7 +--
.../helix/core/PinotHelixResourceManager.java | 13 +----
.../helix/core/PinotTableIdealStateBuilder.java | 16 ++---
.../assignment/segment/BaseSegmentAssignment.java | 7 +--
.../segment/OfflineSegmentAssignment.java | 6 --
.../segment/RealtimeSegmentAssignment.java | 6 --
.../BalancedNumSegmentAssignmentStrategy.java | 7 +--
.../ReplicaGroupSegmentAssignmentStrategy.java | 6 +-
.../realtime/PinotLLCRealtimeSegmentManager.java | 2 +-
.../api/PinotTableRestletResourceTest.java | 5 +-
.../api/TableConfigsRestletResourceTest.java | 4 +-
.../controller/helix/PinotResourceManagerTest.java | 7 ++-
...altimeNonReplicaGroupSegmentAssignmentTest.java | 7 ++-
...NonReplicaGroupTieredSegmentAssignmentTest.java | 5 +-
.../RealtimeReplicaGroupSegmentAssignmentTest.java | 5 +-
.../SegmentAssignmentStrategyFactoryTest.java | 12 ++--
.../helix/core/retention/RetentionManagerTest.java | 8 ++-
.../segment/local/utils/TableConfigUtils.java | 14 ++---
.../SegmentsValidationAndRetentionConfig.java | 12 ++++
.../apache/pinot/spi/config/table/TableConfig.java | 28 +++++++++
.../apache/pinot/tools/PinotNumReplicaChanger.java | 2 +-
.../command/RealtimeProvisioningHelperCommand.java | 2 +-
25 files changed, 158 insertions(+), 98 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java
index 8689b06e22..6a0ae1188e 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java
@@ -106,7 +106,7 @@ public class InstanceAssignmentConfigUtils {
InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig;
SegmentsValidationAndRetentionConfig segmentConfig =
tableConfig.getValidationConfig();
- int numReplicaGroups = segmentConfig.getReplicationNumber();
+ int numReplicaGroups = tableConfig.getReplication();
ReplicaGroupStrategyConfig replicaGroupStrategyConfig =
segmentConfig.getReplicaGroupStrategyConfig();
Preconditions.checkState(replicaGroupStrategyConfig != null, "Failed to
find the replica-group strategy config");
String partitionColumn = replicaGroupStrategyConfig.getPartitionColumn();
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigTest.java
index ffeef06aca..100cd62c59 100644
---
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigTest.java
+++
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigTest.java
@@ -22,31 +22,38 @@ import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
+import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertTrue;
public class TableConfigTest {
+ private static final String TEST_OFFLINE_TABLE_NAME = "testllc_OFFLINE";
+ private static final String TEST_REALTIME_HLC_TABLE_NAME =
"testhlc_REALTIME";
+ private static final String TEST_REALTIME_LLC_TABLE_NAME =
"testllc_REALTIME";
+
@DataProvider
public Object[][] configs()
throws IOException {
try (Stream<Path> configs =
Files.list(Paths.get("src/test/resources/testConfigs"))) {
return configs.map(path -> {
- try {
- return Files.readAllBytes(path);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- })
- .map(config -> new Object[]{config})
- .toArray(Object[][]::new);
+ try {
+ return Files.readAllBytes(path);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }).map(config -> new Object[]{config}).toArray(Object[][]::new);
}
}
@@ -56,4 +63,49 @@ public class TableConfigTest {
TableConfig tableConfig =
JsonUtils.DEFAULT_READER.forType(TableConfig.class).readValue(config);
assertTrue(StringUtils.isNotBlank(tableConfig.getTableName()));
}
+
+ @Test
+ public void testGetReplication() {
+ TableConfig offlineTableConfig =
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName(TEST_OFFLINE_TABLE_NAME).setNumReplicas(2).build();
+ assertEquals(2, offlineTableConfig.getReplication());
+
+ offlineTableConfig.getValidationConfig().setReplication("4");
+ assertEquals(4, offlineTableConfig.getReplication());
+
+ offlineTableConfig.getValidationConfig().setReplicasPerPartition("3");
+ assertEquals(4, offlineTableConfig.getReplication());
+
+ TableConfig realtimeHLCTableConfig =
+ new
TableConfigBuilder(TableType.REALTIME).setTableName(TEST_REALTIME_HLC_TABLE_NAME)
+
.setStreamConfigs(getStreamConfigMap("highlevel")).setNumReplicas(2).build();
+ assertEquals(2, realtimeHLCTableConfig.getReplication());
+
+ realtimeHLCTableConfig.getValidationConfig().setReplication("4");
+ assertEquals(4, realtimeHLCTableConfig.getReplication());
+
+ realtimeHLCTableConfig.getValidationConfig().setReplicasPerPartition("3");
+ assertEquals(4, realtimeHLCTableConfig.getReplication());
+
+ TableConfig realtimeLLCTableConfig =
+ new
TableConfigBuilder(TableType.REALTIME).setTableName(TEST_REALTIME_LLC_TABLE_NAME)
+
.setStreamConfigs(getStreamConfigMap("lowlevel")).setLLC(true).setNumReplicas(2).build();
+
+ assertEquals(2, realtimeLLCTableConfig.getReplication());
+
+ realtimeLLCTableConfig.getValidationConfig().setReplication("4");
+ assertEquals(2, realtimeLLCTableConfig.getReplication());
+
+ realtimeLLCTableConfig.getValidationConfig().setReplicasPerPartition("3");
+ assertEquals(3, realtimeLLCTableConfig.getReplication());
+ }
+
+ private Map<String, String> getStreamConfigMap(String consumerType) {
+ Map<String, String> configMap = new HashMap<>();
+ configMap.put("streamType", "kafka");
+ configMap.put("stream.kafka.consumer.type", consumerType);
+ configMap.put("stream.kafka.topic.name", "test");
+ configMap.put("stream.kafka.decoder.class.name", "test");
+ return configMap;
+ }
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index 532a5e66cb..54f9b57250 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -98,7 +98,6 @@ import
org.apache.pinot.controller.util.TableIngestionStatusHelper;
import org.apache.pinot.controller.util.TableMetadataReader;
import org.apache.pinot.core.auth.ManualAuthorization;
import org.apache.pinot.segment.local.utils.TableConfigUtils;
-import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableStats;
import org.apache.pinot.spi.config.table.TableStatus;
@@ -811,9 +810,7 @@ public class PinotTableRestletResource {
String tableNameWithType =
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager,
tableName, tableType, LOGGER).get(0);
TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
- SegmentsValidationAndRetentionConfig segmentsConfig =
- tableConfig != null ? tableConfig.getValidationConfig() : null;
- int numReplica = segmentsConfig == null ? 1 :
Integer.parseInt(segmentsConfig.getReplication());
+ int numReplica = tableConfig == null ? 1 : tableConfig.getReplication();
String segmentsMetadata;
try {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
index bcdcf876f0..941bb8f0b1 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
@@ -143,12 +143,7 @@ public class SegmentStatusChecker extends
ControllerPeriodicTask<SegmentStatusCh
_controllerMetrics.setValueOfTableGauge(tableNameWithType,
ControllerGauge.REPLICATION_FROM_CONFIG, 0);
return;
}
- int replication;
- if (tableConfig.getTableType() == TableType.REALTIME) {
- replication =
tableConfig.getValidationConfig().getReplicasPerPartitionNumber();
- } else {
- replication = tableConfig.getValidationConfig().getReplicationNumber();
- }
+ int replication = tableConfig.getReplication();
_controllerMetrics.setValueOfTableGauge(tableNameWithType,
ControllerGauge.REPLICATION_FROM_CONFIG, replication);
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index d2b2947293..efb209795e 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -136,7 +136,6 @@ import
org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
import org.apache.pinot.controller.helix.core.util.ZKMetadataUtils;
import org.apache.pinot.controller.helix.starter.HelixConfig;
-import org.apache.pinot.segment.local.utils.ReplicationUtils;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.spi.config.ConfigUtils;
import org.apache.pinot.spi.config.instance.Instance;
@@ -1474,7 +1473,6 @@ public class PinotHelixResourceManager {
}
validateTableTenantConfig(tableConfig);
- SegmentsValidationAndRetentionConfig segmentsConfig =
tableConfig.getValidationConfig();
TableType tableType = tableConfig.getTableType();
switch (tableType) {
@@ -1482,7 +1480,7 @@ public class PinotHelixResourceManager {
// now lets build an ideal state
LOGGER.info("building empty ideal state for table : " +
tableNameWithType);
final IdealState offlineIdealState =
PinotTableIdealStateBuilder.buildEmptyIdealStateFor(tableNameWithType,
- Integer.parseInt(segmentsConfig.getReplication()),
_enableBatchMessageMode);
+ tableConfig.getReplication(), _enableBatchMessageMode);
LOGGER.info("adding table via the admin");
try {
@@ -1795,7 +1793,7 @@ public class PinotHelixResourceManager {
// Update IdealState replication
IdealState idealState =
_helixAdmin.getResourceIdealState(_helixClusterName, tableNameWithType);
- String replicationConfigured = segmentsConfig.getReplication();
+ String replicationConfigured =
Integer.toString(tableConfig.getReplication());
if (!idealState.getReplicas().equals(replicationConfigured)) {
HelixHelper.updateIdealState(_helixZkManager, tableNameWithType, is
-> {
assert is != null;
@@ -3742,12 +3740,7 @@ public class PinotHelixResourceManager {
Set<String> serverInstances =
getAllInstancesForServerTenant(tenantConfig.getServer());
return serverInstances.size();
}
-
- if (ReplicationUtils.useReplicasPerPartition(tableConfig)) {
- return
Integer.parseInt(tableConfig.getValidationConfig().getReplicasPerPartition());
- }
-
- return tableConfig.getValidationConfig().getReplicationNumber();
+ return tableConfig.getReplication();
}
/**
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
index beb4f796de..ac24151d67 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
@@ -92,31 +92,25 @@ public class PinotTableIdealStateBuilder {
List<String> realtimeInstances =
HelixHelper.getInstancesWithTag(helixManager,
TagNameUtils.extractConsumingServerTag(realtimeTableConfig.getTenantConfig()));
IdealState idealState = buildEmptyRealtimeIdealStateFor(realtimeTableName,
1, enableBatchMessageMode);
- if (realtimeInstances.size() %
Integer.parseInt(realtimeTableConfig.getValidationConfig().getReplication()) !=
0) {
+ if (realtimeInstances.size() % realtimeTableConfig.getReplication() != 0) {
throw new RuntimeException(
"Number of instance in current tenant should be an integer multiples
of the number of replications");
}
setupInstanceConfigForHighLevelConsumer(realtimeTableName,
realtimeInstances.size(),
-
Integer.parseInt(realtimeTableConfig.getValidationConfig().getReplication()),
- IngestionConfigUtils.getStreamConfigMap(realtimeTableConfig),
zkHelixPropertyStore, realtimeInstances);
+ realtimeTableConfig.getReplication(),
IngestionConfigUtils.getStreamConfigMap(realtimeTableConfig),
+ zkHelixPropertyStore, realtimeInstances);
return idealState;
}
public static void
buildLowLevelRealtimeIdealStateFor(PinotLLCRealtimeSegmentManager
pinotLLCRealtimeSegmentManager,
String realtimeTableName, TableConfig realtimeTableConfig, IdealState
idealState,
boolean enableBatchMessageMode) {
-
// Validate replicasPerPartition here.
- final String replicasPerPartitionStr =
realtimeTableConfig.getValidationConfig().getReplicasPerPartition();
- if (replicasPerPartitionStr == null || replicasPerPartitionStr.isEmpty()) {
- throw new RuntimeException("Null or empty value for
replicasPerPartition, expected a number");
- }
final int nReplicas;
try {
- nReplicas = Integer.valueOf(replicasPerPartitionStr);
+ nReplicas = realtimeTableConfig.getReplication();
} catch (NumberFormatException e) {
- throw new InvalidTableConfigException(
- "Invalid value for replicasPerPartition, expected a number: " +
replicasPerPartitionStr, e);
+ throw new InvalidTableConfigException("Invalid value for
replicasPerPartition, expected a number.", e);
}
if (idealState == null) {
idealState = buildEmptyRealtimeIdealStateFor(realtimeTableName,
nReplicas, enableBatchMessageMode);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/BaseSegmentAssignment.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/BaseSegmentAssignment.java
index c91efb904a..fc66bce53e 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/BaseSegmentAssignment.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/BaseSegmentAssignment.java
@@ -75,7 +75,7 @@ public abstract class BaseSegmentAssignment implements
SegmentAssignment {
_helixManager = helixManager;
_tableNameWithType = tableConfig.getTableName();
_tableConfig = tableConfig;
- _replication = getReplication(tableConfig);
+ _replication = tableConfig.getReplication();
ReplicaGroupStrategyConfig replicaGroupStrategyConfig =
tableConfig.getValidationConfig().getReplicaGroupStrategyConfig();
_partitionColumn = replicaGroupStrategyConfig != null ?
replicaGroupStrategyConfig.getPartitionColumn() : null;
@@ -89,11 +89,6 @@ public abstract class BaseSegmentAssignment implements
SegmentAssignment {
}
}
- /**
- * Returns the replication of the table.
- */
- protected abstract int getReplication(TableConfig tableConfig);
-
/**
* Rebalances tiers and returns a pair of tier assignments and non-tier
assignment.
*/
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
index ec04b728fb..36f784515a 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
@@ -30,7 +30,6 @@ import org.apache.pinot.common.tier.Tier;
import
org.apache.pinot.controller.helix.core.assignment.segment.strategy.AllServersSegmentAssignmentStrategy;
import
org.apache.pinot.controller.helix.core.assignment.segment.strategy.SegmentAssignmentStrategy;
import
org.apache.pinot.controller.helix.core.assignment.segment.strategy.SegmentAssignmentStrategyFactory;
-import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.apache.pinot.spi.utils.RebalanceConfigConstants;
@@ -40,11 +39,6 @@ import org.apache.pinot.spi.utils.RebalanceConfigConstants;
*/
public class OfflineSegmentAssignment extends BaseSegmentAssignment {
- @Override
- protected int getReplication(TableConfig tableConfig) {
- return tableConfig.getValidationConfig().getReplicationNumber();
- }
-
@Override
public List<String> assignSegment(String segmentName, Map<String,
Map<String, String>> currentAssignment,
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
index 4f44aa1e44..c3e4f4c239 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
@@ -31,7 +31,6 @@ import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.tier.Tier;
import
org.apache.pinot.controller.helix.core.assignment.segment.strategy.SegmentAssignmentStrategy;
import
org.apache.pinot.controller.helix.core.assignment.segment.strategy.SegmentAssignmentStrategyFactory;
-import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.spi.utils.RebalanceConfigConstants;
@@ -74,11 +73,6 @@ import org.apache.pinot.spi.utils.RebalanceConfigConstants;
*/
public class RealtimeSegmentAssignment extends BaseSegmentAssignment {
- @Override
- protected int getReplication(TableConfig tableConfig) {
- return tableConfig.getValidationConfig().getReplicasPerPartitionNumber();
- }
-
@Override
public List<String> assignSegment(String segmentName, Map<String,
Map<String, String>> currentAssignment,
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/BalancedNumSegmentAssignmentStrategy.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/BalancedNumSegmentAssignmentStrategy.java
index 1d26abc296..e9c540da78 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/BalancedNumSegmentAssignmentStrategy.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/BalancedNumSegmentAssignmentStrategy.java
@@ -26,7 +26,6 @@ import org.apache.pinot.common.assignment.InstancePartitions;
import
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,11 +50,7 @@ public class BalancedNumSegmentAssignmentStrategy implements
SegmentAssignmentSt
_tableNameWithType = tableConfig.getTableName();
SegmentsValidationAndRetentionConfig validationAndRetentionConfig =
tableConfig.getValidationConfig();
Preconditions.checkState(validationAndRetentionConfig != null, "Validation
Config is null");
- // Number of replicas per partition of low-level consumers check is for
the real time tables only
- // TODO: Cleanup required once we fetch the replication number from table
config depending on table type
- _replication = tableConfig.getTableType() == TableType.REALTIME
- ? validationAndRetentionConfig.getReplicasPerPartitionNumber()
- : validationAndRetentionConfig.getReplicationNumber();
+ _replication = tableConfig.getReplication();
LOGGER.info("Initialized BalancedNumSegmentAssignmentStrategy for table: "
+ "{} with replication: {}",
_tableNameWithType, _replication);
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/ReplicaGroupSegmentAssignmentStrategy.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/ReplicaGroupSegmentAssignmentStrategy.java
index 94069dc8c2..d5a4d0e027 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/ReplicaGroupSegmentAssignmentStrategy.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/ReplicaGroupSegmentAssignmentStrategy.java
@@ -53,11 +53,7 @@ class ReplicaGroupSegmentAssignmentStrategy implements
SegmentAssignmentStrategy
_tableName = tableConfig.getTableName();
SegmentsValidationAndRetentionConfig validationAndRetentionConfig =
tableConfig.getValidationConfig();
Preconditions.checkState(validationAndRetentionConfig != null, "Validation
Config is null");
- // Number of replicas per partition of low-level consumers check is for
the real time tables only
- // TODO: Cleanup required once we fetch the replication number from table
config depending on table type
- _replication = tableConfig.getTableType() == TableType.REALTIME
- ? validationAndRetentionConfig.getReplicasPerPartitionNumber()
- : validationAndRetentionConfig.getReplicationNumber();
+ _replication = tableConfig.getReplication();
ReplicaGroupStrategyConfig replicaGroupStrategyConfig =
validationAndRetentionConfig.getReplicaGroupStrategyConfig();
_partitionColumn = replicaGroupStrategyConfig != null ?
replicaGroupStrategyConfig.getPartitionColumn() : null;
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 762ffdc421..b7ab051ab2 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -1339,7 +1339,7 @@ public class PinotLLCRealtimeSegmentManager {
private int getNumReplicas(TableConfig tableConfig, InstancePartitions
instancePartitions) {
if (instancePartitions.getNumReplicaGroups() == 1) {
// Non-replica-group based
- return tableConfig.getValidationConfig().getReplicasPerPartitionNumber();
+ return tableConfig.getReplication();
} else {
// Replica-group based
return instancePartitions.getNumReplicaGroups();
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java
index 2b85ed477d..8339e3025f 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java
@@ -229,7 +229,7 @@ public class PinotTableRestletResourceTest extends
ControllerTest {
sendPostRequest(_createTableUrl, tableJSONConfigString);
// table creation should succeed
TableConfig tableConfig = getTableConfig(tableName, "OFFLINE");
- assertEquals(tableConfig.getValidationConfig().getReplicationNumber(),
+ assertEquals(tableConfig.getReplication(),
Math.max(tableReplication, DEFAULT_MIN_NUM_REPLICAS));
DEFAULT_INSTANCE.addDummySchema(tableName);
@@ -237,8 +237,7 @@ public class PinotTableRestletResourceTest extends
ControllerTest {
_realtimeBuilder.setTableName(tableName).setNumReplicas(tableReplication).build().toJsonString();
sendPostRequest(_createTableUrl, tableJSONConfigString);
tableConfig = getTableConfig(tableName, "REALTIME");
- assertEquals(tableConfig.getValidationConfig().getReplicationNumber(),
- Math.max(tableReplication, DEFAULT_MIN_NUM_REPLICAS));
+ assertEquals(tableConfig.getReplication(), Math.max(tableReplication,
DEFAULT_MIN_NUM_REPLICAS));
DEFAULT_INSTANCE.getHelixResourceManager().deleteOfflineTable(tableName);
DEFAULT_INSTANCE.getHelixResourceManager().deleteRealtimeTable(tableName);
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableConfigsRestletResourceTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableConfigsRestletResourceTest.java
index fad767d123..a24e774981 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableConfigsRestletResourceTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableConfigsRestletResourceTest.java
@@ -306,9 +306,9 @@ public class TableConfigsRestletResourceTest extends
ControllerTest {
response =
sendGetRequest(DEFAULT_INSTANCE.getControllerRequestURLBuilder().forTableConfigsGet(tableName));
tableConfigsResponse = JsonUtils.stringToObject(response,
TableConfigs.class);
Assert.assertEquals(tableConfigsResponse.getTableName(), tableName);
-
Assert.assertEquals(tableConfigsResponse.getOffline().getValidationConfig().getReplicationNumber(),
+ Assert.assertEquals(tableConfigsResponse.getOffline().getReplication(),
DEFAULT_MIN_NUM_REPLICAS);
-
Assert.assertEquals(tableConfigsResponse.getRealtime().getValidationConfig().getReplicasPerPartitionNumber(),
+ Assert.assertEquals(tableConfigsResponse.getRealtime().getReplication(),
DEFAULT_MIN_NUM_REPLICAS);
sendDeleteRequest(DEFAULT_INSTANCE.getControllerRequestURLBuilder().forTableConfigsDelete(tableName));
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
index 6e4230c39a..445e43a871 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
@@ -79,15 +79,16 @@ public class PinotResourceManagerTest {
Schema dummySchema = TEST_INSTANCE.createDummySchema(invalidRealtimeTable);
TEST_INSTANCE.addSchema(dummySchema);
- Map<String, String> streamConfigs =
FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap();
// Missing replicasPerPartition
TableConfig invalidRealtimeTableConfig =
- new
TableConfigBuilder(TableType.REALTIME).setStreamConfigs(streamConfigs).setTableName(invalidRealtimeTable)
+ new
TableConfigBuilder(TableType.REALTIME).setTableName(invalidRealtimeTable)
.setSchemaName(dummySchema.getSchemaName()).build();
+
try {
TEST_INSTANCE.getHelixResourceManager().addTable(invalidRealtimeTableConfig);
Assert.fail(
- "Table creation should have thrown exception due to missing
replicasPerPartition in validation config");
+ "Table creation should have thrown exception due to missing stream
config and replicasPerPartition in "
+ + "validation config");
} catch (Exception e) {
// expected
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java
index 4e20967949..2bf6fd11e0 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java
@@ -30,6 +30,7 @@ import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
@@ -79,9 +80,10 @@ public class RealtimeNonReplicaGroupSegmentAssignmentTest {
System.currentTimeMillis()).getSegmentName());
}
+ Map<String, String> streamConfigs =
FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap();
TableConfig tableConfig =
new
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS)
- .setLLC(true).build();
+ .setLLC(true).setStreamConfigs(streamConfigs).build();
_segmentAssignment =
SegmentAssignmentFactory.getSegmentAssignment(createHelixManager(),
tableConfig);
_instancePartitionsMap = new TreeMap<>();
@@ -113,9 +115,10 @@ public class RealtimeNonReplicaGroupSegmentAssignmentTest {
@Test
public void testReplicationForSegmentAssignment() {
+ Map<String, String> streamConfigs =
FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap();
TableConfig tableConfig =
new
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS)
- .setLLC(true).build();
+ .setLLC(true).setStreamConfigs(streamConfigs).build();
// Update the replication by changing the NUM_REPLICAS_PER_PARTITION
tableConfig.getValidationConfig().setReplicasPerPartition(NUM_REPLICAS_PER_PARTITION);
SegmentAssignment segmentAssignment =
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java
index c3f1ae30fe..73cb94c4fa 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java
@@ -33,6 +33,7 @@ import org.apache.pinot.common.tier.Tier;
import org.apache.pinot.common.tier.TierFactory;
import org.apache.pinot.common.tier.TierSegmentSelector;
import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.TierConfig;
@@ -117,9 +118,11 @@ public class
RealtimeNonReplicaGroupTieredSegmentAssignmentTest {
TierFactory.PINOT_SERVER_STORAGE_TYPE, TAG_B_NAME, null, null),
new TierConfig(TIER_C_NAME, TierFactory.TIME_SEGMENT_SELECTOR_TYPE,
"30d", null,
TierFactory.PINOT_SERVER_STORAGE_TYPE, TAG_C_NAME, null, null));
+
+ Map<String, String> streamConfigs =
FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap();
TableConfig tableConfig =
new
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS)
- .setTierConfigList(tierConfigList).setLLC(true).build();
+
.setTierConfigList(tierConfigList).setLLC(true).setStreamConfigs(streamConfigs).build();
_segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(null,
tableConfig);
_instancePartitionsMap = new TreeMap<>();
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java
index 11dbe233ae..713b4c442a 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java
@@ -31,6 +31,7 @@ import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
@@ -83,9 +84,11 @@ public class RealtimeReplicaGroupSegmentAssignmentTest {
System.currentTimeMillis()).getSegmentName());
}
+ Map<String, String> streamConfigs =
FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap();
TableConfig tableConfig =
new
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS)
-
.setLLC(true).setSegmentAssignmentStrategy(AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY)
+ .setLLC(true).setStreamConfigs(streamConfigs)
+
.setSegmentAssignmentStrategy(AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY)
.setReplicaGroupStrategyConfig(new
ReplicaGroupStrategyConfig(PARTITION_COLUMN, 1)).build();
_segmentAssignment =
SegmentAssignmentFactory.getSegmentAssignment(createHelixManager(),
tableConfig);
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/SegmentAssignmentStrategyFactoryTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/SegmentAssignmentStrategyFactoryTest.java
index b0f12240f5..619d61ef82 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/SegmentAssignmentStrategyFactoryTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/SegmentAssignmentStrategyFactoryTest.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Map;
import org.apache.pinot.common.assignment.InstancePartitions;
import
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentTestUtils;
+import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
@@ -98,14 +99,15 @@ public class SegmentAssignmentStrategyFactoryTest {
@Test
public void testBalancedNumSegmentAssignmentStrategyForRealtimeTables() {
- TableConfig tableConfig =
- new
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setLLC(true).build();
+ Map<String, String> streamConfigs =
FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap();
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setLLC(true)
+ .setStreamConfigs(streamConfigs).build();
InstancePartitions instancePartitions = new
InstancePartitions(INSTANCE_PARTITIONS_NAME);
instancePartitions.setInstances(0, 0, INSTANCES);
- SegmentAssignmentStrategy segmentAssignmentStrategy =
SegmentAssignmentStrategyFactory
- .getSegmentAssignmentStrategy(null, tableConfig,
InstancePartitionsType.COMPLETED.toString(),
- instancePartitions);
+ SegmentAssignmentStrategy segmentAssignmentStrategy =
+ SegmentAssignmentStrategyFactory.getSegmentAssignmentStrategy(null,
tableConfig,
+ InstancePartitionsType.COMPLETED.toString(), instancePartitions);
Assert.assertNotNull(segmentAssignmentStrategy);
Assert.assertTrue(segmentAssignmentStrategy instanceof
BalancedNumSegmentAssignmentStrategy);
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
index b5d1162c53..85aba90f36 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
@@ -21,6 +21,7 @@ package org.apache.pinot.controller.helix.core.retention;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.helix.HelixAdmin;
import org.apache.helix.model.IdealState;
@@ -34,6 +35,7 @@ import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder;
import org.apache.pinot.controller.helix.core.SegmentDeletionManager;
+import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.metrics.PinotMetricUtils;
@@ -152,8 +154,10 @@ public class RetentionManagerTest {
}
private TableConfig createRealtimeTableConfig1(int replicaCount) {
+ Map<String, String> streamConfigs =
FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap();
return new
TableConfigBuilder(TableType.REALTIME).setTableName(TEST_TABLE_NAME).setLLC(true)
-
.setRetentionTimeUnit("DAYS").setRetentionTimeValue("5").setNumReplicas(replicaCount).build();
+
.setStreamConfigs(streamConfigs).setRetentionTimeUnit("DAYS").setRetentionTimeValue("5")
+ .setNumReplicas(replicaCount).build();
}
private void setupPinotHelixResourceManager(TableConfig tableConfig, final
List<String> removedSegments,
@@ -233,7 +237,7 @@ public class RetentionManagerTest {
private PinotHelixResourceManager setupSegmentMetadata(TableConfig
tableConfig, final long now, final int nSegments,
List<String> segmentsToBeDeleted) {
- final int replicaCount =
Integer.valueOf(tableConfig.getValidationConfig().getReplicasPerPartition());
+ final int replicaCount = tableConfig.getReplication();
List<SegmentZKMetadata> segmentsZKMetadata = new ArrayList<>();
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 2f15d9b2b4..7aaf01c787 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -972,6 +972,9 @@ public final class TableConfigUtils {
}
/**
+ * TODO: After deprecating "replicasPerPartition", we can change this
function's behavior to always overwrite
+ * config to "replication" only.
+ *
* Ensure that the table config has the minimum number of replicas set as
per cluster configs.
* If is doesn't, set the required amount of replication in the table config
*/
@@ -992,7 +995,7 @@ public final class TableConfigUtils {
if (verifyReplication) {
int requestReplication;
try {
- requestReplication = segmentsConfig.getReplicationNumber();
+ requestReplication = tableConfig.getReplication();
if (requestReplication < defaultTableMinReplicas) {
LOGGER.info("Creating table with minimum replication factor of: {}
instead of requested replication: {}",
defaultTableMinReplicas, requestReplication);
@@ -1004,12 +1007,9 @@ public final class TableConfigUtils {
}
if (verifyReplicasPerPartition) {
- String replicasPerPartitionStr =
segmentsConfig.getReplicasPerPartition();
- if (replicasPerPartitionStr == null) {
- throw new IllegalStateException("Field replicasPerPartition needs to
be specified");
- }
+ int replicasPerPartition;
try {
- int replicasPerPartition = Integer.parseInt(replicasPerPartitionStr);
+ replicasPerPartition = tableConfig.getReplication();
if (replicasPerPartition < defaultTableMinReplicas) {
LOGGER.info(
"Creating table with minimum replicasPerPartition of: {} instead
of requested replicasPerPartition: {}",
@@ -1017,7 +1017,7 @@ public final class TableConfigUtils {
segmentsConfig.setReplicasPerPartition(String.valueOf(defaultTableMinReplicas));
}
} catch (NumberFormatException e) {
- throw new IllegalStateException("Invalid value for
replicasPerPartition: '" + replicasPerPartitionStr + "'", e);
+ throw new IllegalStateException("Invalid replicasPerPartition number",
e);
}
}
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java
index 9595c879c4..849c1e8902 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java
@@ -124,6 +124,9 @@ public class SegmentsValidationAndRetentionConfig extends
BaseJsonConfig {
_segmentPushType = segmentPushType;
}
+ /**
+ * Try to Use {@link TableConfig#getReplication()}
+ */
public String getReplication() {
return _replication;
}
@@ -142,6 +145,9 @@ public class SegmentsValidationAndRetentionConfig extends
BaseJsonConfig {
_schemaName = schemaName;
}
+ /**
+ * Try to Use {@link TableConfig#getReplication()}
+ */
public String getReplicasPerPartition() {
return _replicasPerPartition;
}
@@ -166,11 +172,17 @@ public class SegmentsValidationAndRetentionConfig extends
BaseJsonConfig {
_completionConfig = completionConfig;
}
+ /**
+ * Try to Use {@link TableConfig#getReplication()}
+ */
@JsonIgnore
public int getReplicationNumber() {
return Integer.parseInt(_replication);
}
+ /**
+ * Try to Use {@link TableConfig#getReplication()}
+ */
@JsonIgnore
public int getReplicasPerPartitionNumber() {
return Integer.parseInt(_replicasPerPartition);
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
index e9e0032cda..6c478b7995 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
@@ -31,6 +31,8 @@ import
org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.apache.pinot.spi.config.table.assignment.SegmentAssignmentConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -354,4 +356,30 @@ public class TableConfig extends BaseJsonConfig {
public void setSegmentAssignmentConfigMap(Map<String,
SegmentAssignmentConfig> segmentAssignmentConfigMap) {
_segmentAssignmentConfigMap = segmentAssignmentConfigMap;
}
+
+ @JsonIgnore
+ public int getReplication() {
+ int replication = 0;
+ if (_tableType == TableType.REALTIME) {
+ StreamConfig streamConfig = new StreamConfig(_tableName,
IngestionConfigUtils.getStreamConfigMap(this));
+ if (streamConfig.hasHighLevelConsumerType()) {
+ // In case of HLC, we read from "replication"
+ replication = Integer.parseInt(_validationConfig.getReplication());
+ } else {
+ // To keep the backward compatibility, we read from
"replicasPerPartition" in case of LLC
+ String replicasPerPartitionStr =
_validationConfig.getReplicasPerPartition();
+ try {
+ replication = Integer.parseInt(replicasPerPartitionStr);
+ } catch (NumberFormatException e) {
+ // If numReplicasPerPartition is not being used or specified, read
the value from replication
+ String replicationStr = _validationConfig.getReplication();
+ replication = Integer.parseInt(replicationStr);
+ }
+ }
+ } else {
+ // In case of OFFLINE tables, we read from "replication"
+ replication = Integer.parseInt(_validationConfig.getReplication());
+ }
+ return replication;
+ }
}
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/PinotNumReplicaChanger.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/PinotNumReplicaChanger.java
index 30a6284b73..528453115e 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/PinotNumReplicaChanger.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/PinotNumReplicaChanger.java
@@ -53,7 +53,7 @@ public class PinotNumReplicaChanger extends PinotZKChanger {
// Get the number of replicas in the tableconfig.
final String offlineTableName =
TableNameBuilder.OFFLINE.tableNameWithType(tableName);
final TableConfig offlineTableConfig =
ZKMetadataProvider.getOfflineTableConfig(_propertyStore, offlineTableName);
- final int newNumReplicas =
Integer.parseInt(offlineTableConfig.getValidationConfig().getReplication());
+ final int newNumReplicas = offlineTableConfig.getReplication();
// Now get the idealstate, and get the number of replicas in it.
IdealState currentIdealState =
_helixAdmin.getResourceIdealState(_clusterName, offlineTableName);
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RealtimeProvisioningHelperCommand.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RealtimeProvisioningHelperCommand.java
index a3ca0caca0..d86dcc2a91 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RealtimeProvisioningHelperCommand.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RealtimeProvisioningHelperCommand.java
@@ -221,7 +221,7 @@ public class RealtimeProvisioningHelperCommand extends
AbstractBaseAdminCommand
StringBuilder note = new StringBuilder();
note.append("\nNote:\n");
- int numReplicas =
tableConfig.getValidationConfig().getReplicasPerPartitionNumber();
+ int numReplicas = tableConfig.getReplication();
int tableRetentionHours = (int)
TimeUnit.valueOf(tableConfig.getValidationConfig().getRetentionTimeUnit())
.toHours(Long.parseLong(tableConfig.getValidationConfig().getRetentionTimeValue()));
if (_retentionHours > 0) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]