Jackie-Jiang commented on code in PR #16791:
URL: https://github.com/apache/pinot/pull/16791#discussion_r2369958190
##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java:
##########
@@ -156,10 +203,29 @@ public synchronized void processClusterChange(ChangeType
changeType) {
}
private void processSegmentAssignmentChange() {
+ _globalLock.readLock().lock();
+ ThreadFactory threadFactory = new
ThreadFactoryBuilder().setNameFormat("async-broker-assignment-change-%d").build();
+ ExecutorService executorService =
Executors.newFixedThreadPool(_processSegmentAssignmentChangeNumThreads,
Review Comment:
Shall we keep this executor service to be re-used? Assignment change should
be very frequent and we don't need to re-create a new executor every time
##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -461,6 +461,11 @@ public static class Broker {
"pinot.broker.enable.partition.metadata.manager";
public static final boolean DEFAULT_ENABLE_PARTITION_METADATA_MANAGER =
true;
+ public static final String
CONFIG_OF_ROUTING_ASSIGNMENT_CHANGE_PROCESS_PARALLELISM =
+ "pinot.broker.routing.assignment.change.process.parallelism";
+ public static final int
DEFAULT_ROUTING_PROCESS_SEGMENT_ASSIGNMENT_CHANGE_NUM_THREADS =
Review Comment:
(minor) Keep the name consistent
```suggestion
public static final int
DEFAULT_ROUTING_ASSIGNMENT_CHANGE_PROCESS_PARALLELISM =
```
##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java:
##########
@@ -168,7 +234,7 @@ private void processSegmentAssignmentChange() {
List<RoutingEntry> routingEntries = new ArrayList<>(numTables);
Review Comment:
Seems we only need the `tableName`. This should be able to reduce memory
footprint
##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java:
##########
@@ -177,38 +243,51 @@ private void processSegmentAssignmentChange() {
Stat[] externalViewStats = _zkDataAccessor.getStats(externalViewPaths,
AccessOption.PERSISTENT);
long fetchStatsEndTimeMs = System.currentTimeMillis();
- List<String> tablesToUpdate = new ArrayList<>();
+ ConcurrentLinkedQueue<String> tablesToUpdate = new
ConcurrentLinkedQueue<>();
+ List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < numTables; i++) {
- Stat idealStateStat = idealStateStats[i];
- Stat externalViewStat = externalViewStats[i];
+ final Stat idealStateStat = idealStateStats[i];
+ final Stat externalViewStat = externalViewStats[i];
if (idealStateStat != null && externalViewStat != null) {
- RoutingEntry routingEntry = routingEntries.get(i);
- if (idealStateStat.getVersion() !=
routingEntry.getLastUpdateIdealStateVersion()
- || externalViewStat.getVersion() !=
routingEntry.getLastUpdateExternalViewVersion()) {
- String tableNameWithType = routingEntry.getTableNameWithType();
- tablesToUpdate.add(tableNameWithType);
- try {
- IdealState idealState =
getIdealState(routingEntry._idealStatePath);
- if (idealState == null) {
- LOGGER.warn("Failed to find ideal state for table: {}, skipping
updating routing entry",
- tableNameWithType);
- continue;
+ final int index = i;
+ futures.add(executorService.submit(() -> {
+ RoutingEntry cachedRoutingEntry = routingEntries.get(index);
+ Object tableLock =
getRoutingTableBuildLock(cachedRoutingEntry.getTableNameWithType());
Review Comment:
(minor) Cache `tableNameWithType`
##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java:
##########
@@ -177,38 +243,51 @@ private void processSegmentAssignmentChange() {
Stat[] externalViewStats = _zkDataAccessor.getStats(externalViewPaths,
AccessOption.PERSISTENT);
long fetchStatsEndTimeMs = System.currentTimeMillis();
- List<String> tablesToUpdate = new ArrayList<>();
+ ConcurrentLinkedQueue<String> tablesToUpdate = new
ConcurrentLinkedQueue<>();
+ List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < numTables; i++) {
- Stat idealStateStat = idealStateStats[i];
- Stat externalViewStat = externalViewStats[i];
+ final Stat idealStateStat = idealStateStats[i];
Review Comment:
(nit) Doesn't need to be `final`
##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java:
##########
@@ -177,38 +243,51 @@ private void processSegmentAssignmentChange() {
Stat[] externalViewStats = _zkDataAccessor.getStats(externalViewPaths,
AccessOption.PERSISTENT);
long fetchStatsEndTimeMs = System.currentTimeMillis();
- List<String> tablesToUpdate = new ArrayList<>();
+ ConcurrentLinkedQueue<String> tablesToUpdate = new
ConcurrentLinkedQueue<>();
+ List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < numTables; i++) {
- Stat idealStateStat = idealStateStats[i];
- Stat externalViewStat = externalViewStats[i];
+ final Stat idealStateStat = idealStateStats[i];
+ final Stat externalViewStat = externalViewStats[i];
if (idealStateStat != null && externalViewStat != null) {
- RoutingEntry routingEntry = routingEntries.get(i);
- if (idealStateStat.getVersion() !=
routingEntry.getLastUpdateIdealStateVersion()
- || externalViewStat.getVersion() !=
routingEntry.getLastUpdateExternalViewVersion()) {
- String tableNameWithType = routingEntry.getTableNameWithType();
- tablesToUpdate.add(tableNameWithType);
- try {
- IdealState idealState =
getIdealState(routingEntry._idealStatePath);
- if (idealState == null) {
- LOGGER.warn("Failed to find ideal state for table: {}, skipping
updating routing entry",
- tableNameWithType);
- continue;
+ final int index = i;
+ futures.add(executorService.submit(() -> {
+ RoutingEntry cachedRoutingEntry = routingEntries.get(index);
Review Comment:
(minor) You can read this outside of the lambda given `routingEntries` is
already a snapshot
##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java:
##########
@@ -177,38 +243,51 @@ private void processSegmentAssignmentChange() {
Stat[] externalViewStats = _zkDataAccessor.getStats(externalViewPaths,
AccessOption.PERSISTENT);
long fetchStatsEndTimeMs = System.currentTimeMillis();
- List<String> tablesToUpdate = new ArrayList<>();
+ ConcurrentLinkedQueue<String> tablesToUpdate = new
ConcurrentLinkedQueue<>();
Review Comment:
(nit) `tablesUpdated`
##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java:
##########
@@ -489,132 +650,201 @@ public synchronized void
buildRoutingForLogicalTable(String logicalTableName) {
* Builds the routing for a table.
* @param tableNameWithType the name of the table
*/
- public synchronized void buildRouting(String tableNameWithType) {
- LOGGER.info("Building routing for table: {}", tableNameWithType);
-
- TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
- Preconditions.checkState(tableConfig != null, "Failed to find table config
for table: %s", tableNameWithType);
-
- String idealStatePath = getIdealStatePath(tableNameWithType);
- IdealState idealState = getIdealState(idealStatePath);
- Preconditions.checkState(idealState != null, "Failed to find ideal state
for table: %s", tableNameWithType);
- int idealStateVersion = idealState.getRecord().getVersion();
-
- String externalViewPath = getExternalViewPath(tableNameWithType);
- ExternalView externalView = getExternalView(externalViewPath);
- int externalViewVersion;
- // NOTE: External view might be null for new created tables. In such case,
create an empty one and set the version
- // to -1 to ensure the version does not match the next external view
- if (externalView == null) {
- externalView = new ExternalView(tableNameWithType);
- externalViewVersion = -1;
- } else {
- externalViewVersion = externalView.getRecord().getVersion();
+ public void buildRouting(String tableNameWithType) {
+ _globalLock.readLock().lock();
+ try {
+ buildRoutingInternal(tableNameWithType);
+ } finally {
+ _globalLock.readLock().unlock();
}
+ }
- Set<String> onlineSegments = getOnlineSegments(idealState);
+ private void buildRoutingInternal(String tableNameWithType) {
+ long buildStartTimeMs = System.currentTimeMillis();
+ Object tableLock = getRoutingTableBuildLock(tableNameWithType);
+ synchronized (tableLock) {
+ long lastBuildStartTimeMs =
getLastRoutingTableBuildStartTimeMs(tableNameWithType);
+ if (buildStartTimeMs <= lastBuildStartTimeMs) {
+ LOGGER.info("Skipping routing build for table: {} because the build
routing request timestamp {} "
+ + "is earlier than the last build start time: {}",
+ tableNameWithType, buildStartTimeMs, lastBuildStartTimeMs);
+ return;
+ }
- SegmentPreSelector segmentPreSelector =
- SegmentPreSelectorFactory.getSegmentPreSelector(tableConfig,
_propertyStore);
- Set<String> preSelectedOnlineSegments =
segmentPreSelector.preSelect(onlineSegments);
- SegmentSelector segmentSelector =
SegmentSelectorFactory.getSegmentSelector(tableConfig);
- segmentSelector.init(idealState, externalView, preSelectedOnlineSegments);
+ // Record build start time to gate older requests and to use to compare
with the timestamp for when
+ // the global processSegmentAssignmentChange() was last called
+ _routingTableBuildStartTimeMs.put(tableNameWithType,
System.currentTimeMillis());
- // Register segment pruners and initialize segment zk metadata fetcher.
- List<SegmentPruner> segmentPruners =
SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
+ LOGGER.info("Building routing for table: {}", tableNameWithType);
- AdaptiveServerSelector adaptiveServerSelector =
-
AdaptiveServerSelectorFactory.getAdaptiveServerSelector(_serverRoutingStatsManager,
_pinotConfig);
- InstanceSelector instanceSelector =
- InstanceSelectorFactory.getInstanceSelector(tableConfig,
_propertyStore, _brokerMetrics,
- adaptiveServerSelector, _pinotConfig);
- instanceSelector.init(_routableServers, _enabledServerInstanceMap,
idealState, externalView,
- preSelectedOnlineSegments);
+ TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
+ Preconditions.checkState(tableConfig != null, "Failed to find table
config for table: %s", tableNameWithType);
- // Add time boundary manager if both offline and real-time part exist for
a hybrid table
- TimeBoundaryManager timeBoundaryManager = null;
- String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
- if (TableNameBuilder.isOfflineTableResource(tableNameWithType)) {
- // Current table is offline
- String realtimeTableName =
TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
- if (_routingEntryMap.containsKey(realtimeTableName)) {
- LOGGER.info("Adding time boundary manager for table: {}",
tableNameWithType);
- timeBoundaryManager = new TimeBoundaryManager(tableConfig,
_propertyStore, _brokerMetrics);
- timeBoundaryManager.init(idealState, externalView,
preSelectedOnlineSegments);
+ String idealStatePath = getIdealStatePath(tableNameWithType);
+ IdealState idealState = getIdealState(idealStatePath);
+ Preconditions.checkState(idealState != null, "Failed to find ideal state
for table: %s", tableNameWithType);
+ int idealStateVersion = idealState.getRecord().getVersion();
+
+ String externalViewPath = getExternalViewPath(tableNameWithType);
+ ExternalView externalView = getExternalView(externalViewPath);
+ int externalViewVersion;
+ // NOTE: External view might be null for new created tables. In such
case, create an empty one and set the
+ // version to -1 to ensure the version does not match the next external
view
+ if (externalView == null) {
+ externalView = new ExternalView(tableNameWithType);
+ externalViewVersion = -1;
+ } else {
+ externalViewVersion = externalView.getRecord().getVersion();
}
- } else {
- // Current table is real-time
- String offlineTableName =
TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
- RoutingEntry offlineTableRoutingEntry =
_routingEntryMap.get(offlineTableName);
- if (offlineTableRoutingEntry != null &&
offlineTableRoutingEntry.getTimeBoundaryManager() == null) {
- LOGGER.info("Adding time boundary manager for table: {}",
offlineTableName);
-
- // NOTE: Add time boundary manager to the offline part before adding
the routing for the real-time part to
- // ensure no overlapping data getting queried
- TableConfig offlineTableConfig =
ZKMetadataProvider.getTableConfig(_propertyStore, offlineTableName);
- Preconditions.checkState(offlineTableConfig != null, "Failed to find
table config for table: %s",
- offlineTableName);
- IdealState offlineTableIdealState =
getIdealState(getIdealStatePath(offlineTableName));
- Preconditions.checkState(offlineTableIdealState != null, "Failed to
find ideal state for table: %s",
- offlineTableName);
- // NOTE: External view might be null for new created tables. In such
case, create an empty one.
- ExternalView offlineTableExternalView =
getExternalView(getExternalViewPath(offlineTableName));
- if (offlineTableExternalView == null) {
- offlineTableExternalView = new ExternalView(offlineTableName);
+
+ Set<String> onlineSegments = getOnlineSegments(idealState);
+
+ SegmentPreSelector segmentPreSelector =
+ SegmentPreSelectorFactory.getSegmentPreSelector(tableConfig,
_propertyStore);
+ Set<String> preSelectedOnlineSegments =
segmentPreSelector.preSelect(onlineSegments);
+ SegmentSelector segmentSelector =
SegmentSelectorFactory.getSegmentSelector(tableConfig);
+ segmentSelector.init(idealState, externalView,
preSelectedOnlineSegments);
+
+ // Register segment pruners and initialize segment zk metadata fetcher.
+ List<SegmentPruner> segmentPruners =
SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
+
+ AdaptiveServerSelector adaptiveServerSelector =
+
AdaptiveServerSelectorFactory.getAdaptiveServerSelector(_serverRoutingStatsManager,
_pinotConfig);
+ InstanceSelector instanceSelector =
+ InstanceSelectorFactory.getInstanceSelector(tableConfig,
_propertyStore, _brokerMetrics,
+ adaptiveServerSelector, _pinotConfig);
+ instanceSelector.init(_routableServers, _enabledServerInstanceMap,
idealState, externalView,
+ preSelectedOnlineSegments);
+
+ // Add time boundary manager if both offline and real-time part exist
for a hybrid table
+ TimeBoundaryManager timeBoundaryManager = null;
+ String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
+ if (TableNameBuilder.isOfflineTableResource(tableNameWithType)) {
+ // Current table is offline
+ String realtimeTableName =
TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
+ if (_routingEntryMap.containsKey(realtimeTableName)) {
+ LOGGER.info("Adding time boundary manager for table: {}",
tableNameWithType);
+ timeBoundaryManager = new TimeBoundaryManager(tableConfig,
_propertyStore, _brokerMetrics);
+ timeBoundaryManager.init(idealState, externalView,
preSelectedOnlineSegments);
+ }
+ } else {
+ // Current table is real-time
+ String offlineTableName =
TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
+ RoutingEntry offlineTableRoutingEntry =
_routingEntryMap.get(offlineTableName);
+ if (offlineTableRoutingEntry != null &&
offlineTableRoutingEntry.getTimeBoundaryManager() == null) {
+ LOGGER.info("Adding time boundary manager for table: {}",
offlineTableName);
+
+ // NOTE: Add time boundary manager to the offline part before adding
the routing for the real-time part to
+ // ensure no overlapping data getting queried
+ TableConfig offlineTableConfig =
ZKMetadataProvider.getTableConfig(_propertyStore, offlineTableName);
+ Preconditions.checkState(offlineTableConfig != null, "Failed to find
table config for table: %s",
+ offlineTableName);
+ IdealState offlineTableIdealState =
getIdealState(getIdealStatePath(offlineTableName));
+ Preconditions.checkState(offlineTableIdealState != null, "Failed to
find ideal state for table: %s",
+ offlineTableName);
+ // NOTE: External view might be null for new created tables. In such
case, create an empty one.
+ ExternalView offlineTableExternalView =
getExternalView(getExternalViewPath(offlineTableName));
+ if (offlineTableExternalView == null) {
+ offlineTableExternalView = new ExternalView(offlineTableName);
+ }
+ Set<String> offlineTableOnlineSegments =
getOnlineSegments(offlineTableIdealState);
+ SegmentPreSelector offlineTableSegmentPreSelector =
+
SegmentPreSelectorFactory.getSegmentPreSelector(offlineTableConfig,
_propertyStore);
+ Set<String> offlineTablePreSelectedOnlineSegments =
+
offlineTableSegmentPreSelector.preSelect(offlineTableOnlineSegments);
+ TimeBoundaryManager offlineTableTimeBoundaryManager =
+ new TimeBoundaryManager(offlineTableConfig, _propertyStore,
_brokerMetrics);
+ offlineTableTimeBoundaryManager.init(offlineTableIdealState,
offlineTableExternalView,
+ offlineTablePreSelectedOnlineSegments);
+
offlineTableRoutingEntry.setTimeBoundaryManager(offlineTableTimeBoundaryManager);
}
- Set<String> offlineTableOnlineSegments =
getOnlineSegments(offlineTableIdealState);
- SegmentPreSelector offlineTableSegmentPreSelector =
-
SegmentPreSelectorFactory.getSegmentPreSelector(offlineTableConfig,
_propertyStore);
- Set<String> offlineTablePreSelectedOnlineSegments =
-
offlineTableSegmentPreSelector.preSelect(offlineTableOnlineSegments);
- TimeBoundaryManager offlineTableTimeBoundaryManager =
- new TimeBoundaryManager(offlineTableConfig, _propertyStore,
_brokerMetrics);
- offlineTableTimeBoundaryManager.init(offlineTableIdealState,
offlineTableExternalView,
- offlineTablePreSelectedOnlineSegments);
-
offlineTableRoutingEntry.setTimeBoundaryManager(offlineTableTimeBoundaryManager);
}
- }
- SegmentPartitionMetadataManager partitionMetadataManager = null;
- // TODO: Support multiple partition columns
- // TODO: Make partition pruner on top of the partition metadata manager to
avoid keeping 2 copies of the metadata
- if (_enablePartitionMetadataManager) {
- SegmentPartitionConfig segmentPartitionConfig =
tableConfig.getIndexingConfig().getSegmentPartitionConfig();
- if (segmentPartitionConfig != null) {
- Map<String, ColumnPartitionConfig> columnPartitionMap =
segmentPartitionConfig.getColumnPartitionMap();
- if (columnPartitionMap.size() == 1) {
- Map.Entry<String, ColumnPartitionConfig> partitionConfig =
columnPartitionMap.entrySet().iterator().next();
- LOGGER.info("Enabling SegmentPartitionMetadataManager for table: {}
on partition column: {}",
- tableNameWithType, partitionConfig.getKey());
- partitionMetadataManager = new
SegmentPartitionMetadataManager(tableNameWithType, partitionConfig.getKey(),
- partitionConfig.getValue().getFunctionName(),
partitionConfig.getValue().getNumPartitions());
- } else {
- LOGGER.warn("Cannot enable SegmentPartitionMetadataManager for
table: {} with multiple partition columns: {}",
- tableNameWithType, columnPartitionMap.keySet());
+ SegmentPartitionMetadataManager partitionMetadataManager = null;
+ // TODO: Support multiple partition columns
+ // TODO: Make partition pruner on top of the partition metadata manager
to avoid keeping 2 copies of the
+ // metadata
+ if (_enablePartitionMetadataManager) {
+ SegmentPartitionConfig segmentPartitionConfig =
tableConfig.getIndexingConfig().getSegmentPartitionConfig();
+ if (segmentPartitionConfig != null) {
+ Map<String, ColumnPartitionConfig> columnPartitionMap =
segmentPartitionConfig.getColumnPartitionMap();
+ if (columnPartitionMap.size() == 1) {
+ Map.Entry<String, ColumnPartitionConfig> partitionConfig =
+ columnPartitionMap.entrySet().iterator().next();
+ LOGGER.info("Enabling SegmentPartitionMetadataManager for table:
{} on partition column: {}",
+ tableNameWithType, partitionConfig.getKey());
+ partitionMetadataManager =
+ new SegmentPartitionMetadataManager(tableNameWithType,
partitionConfig.getKey(),
+ partitionConfig.getValue().getFunctionName(),
partitionConfig.getValue().getNumPartitions());
+ } else {
+ LOGGER.warn(
+ "Cannot enable SegmentPartitionMetadataManager for table: {}
with multiple partition columns: {}",
+ tableNameWithType, columnPartitionMap.keySet());
+ }
}
}
- }
- QueryConfig queryConfig = tableConfig.getQueryConfig();
- Long queryTimeoutMs = queryConfig != null ? queryConfig.getTimeoutMs() :
null;
+ QueryConfig queryConfig = tableConfig.getQueryConfig();
+ Long queryTimeoutMs = queryConfig != null ? queryConfig.getTimeoutMs() :
null;
- SegmentZkMetadataFetcher segmentZkMetadataFetcher = new
SegmentZkMetadataFetcher(tableNameWithType, _propertyStore);
- for (SegmentZkMetadataFetchListener listener : segmentPruners) {
- segmentZkMetadataFetcher.register(listener);
- }
- if (partitionMetadataManager != null) {
- segmentZkMetadataFetcher.register(partitionMetadataManager);
- }
- segmentZkMetadataFetcher.init(idealState, externalView,
preSelectedOnlineSegments);
+ SegmentZkMetadataFetcher segmentZkMetadataFetcher =
+ new SegmentZkMetadataFetcher(tableNameWithType, _propertyStore);
+ for (SegmentZkMetadataFetchListener listener : segmentPruners) {
+ segmentZkMetadataFetcher.register(listener);
+ }
+ if (partitionMetadataManager != null) {
+ segmentZkMetadataFetcher.register(partitionMetadataManager);
+ }
+ segmentZkMetadataFetcher.init(idealState, externalView,
preSelectedOnlineSegments);
+
+ RoutingEntry routingEntry =
+ new RoutingEntry(tableNameWithType, idealStatePath,
externalViewPath, segmentPreSelector, segmentSelector,
+ segmentPruners, instanceSelector, idealStateVersion,
externalViewVersion, segmentZkMetadataFetcher,
+ timeBoundaryManager, partitionMetadataManager, queryTimeoutMs,
!idealState.isEnabled());
+ if (_routingEntryMap.put(tableNameWithType, routingEntry) == null) {
+ LOGGER.info("Built routing for table: {}", tableNameWithType);
+ } else {
+ LOGGER.info("Rebuilt routing for table: {}", tableNameWithType);
+ }
- RoutingEntry routingEntry =
- new RoutingEntry(tableNameWithType, idealStatePath, externalViewPath,
segmentPreSelector, segmentSelector,
- segmentPruners, instanceSelector, idealStateVersion,
externalViewVersion, segmentZkMetadataFetcher,
- timeBoundaryManager, partitionMetadataManager, queryTimeoutMs,
!idealState.isEnabled());
- if (_routingEntryMap.put(tableNameWithType, routingEntry) == null) {
- LOGGER.info("Built routing for table: {}", tableNameWithType);
- } else {
- LOGGER.info("Rebuilt routing for table: {}", tableNameWithType);
+ // Check for updates to the IS / EV after adding the routing entry, as
it is possible that the
+ // processSegmentAssignmentChange() may have run and missed updating
this newly added entry. Only update
+ // the entry if:
+ // - The calculated build time for this table is older than the
processSegmentAssignmentChange() timestamp, and
+ // - The IS or EV version has changed since the entry was added
+ if (_routingTableBuildStartTimeMs.get(tableNameWithType) <
_processAssignmentChangeSnapshotTimestampMs) {
+ LOGGER.info("processSegmentAssignmentChange started after build
routing for table was started, check if "
+ + "routing entry needs to be updated for table: {} to prevent
missed updates", tableNameWithType);
+ idealStatePath = getIdealStatePath(tableNameWithType);
+ idealState = getIdealState(idealStatePath);
+ Preconditions.checkState(idealState != null, "Failed to find ideal
state for table: %s", tableNameWithType);
+ idealStateVersion = idealState.getRecord().getVersion();
+
+ externalViewPath = getExternalViewPath(tableNameWithType);
+ externalView = getExternalView(externalViewPath);
+ // NOTE: External view might be null for new created tables. In such
case, create an empty one and set the
+ // version to -1 to ensure the version does not match the next
external view
+ if (externalView == null) {
+ externalViewVersion = -1;
+ } else {
+ externalViewVersion = externalView.getRecord().getVersion();
+ }
+
+ RoutingEntry existingRoutingEntry =
_routingEntryMap.get(tableNameWithType);
Review Comment:
(minor) I think you can directly use `routingEntry` given table lock is held
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]