This is an automated email from the ASF dual-hosted git repository.
kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 6dabd1d6f5c Fix metadata APIs to report primary key counts for dedup
tables (#17736)
6dabd1d6f5c is described below
commit 6dabd1d6f5c3c0207fa4f466e4c063cb56aaf2a6
Author: Jinesh Parakh <[email protected]>
AuthorDate: Wed Feb 25 16:16:21 2026 +0530
Fix metadata APIs to report primary key counts for dedup tables (#17736)
Background:
Pinot tracks per-partition primary key counts for realtime tables with
upsert or dedup enabled. These counts are exposed via two server APIs:
- `/tables/{tableName}/metadata` — per-partition PK counts per server
- `/instance/primaryKeyCount` — total PK count across all tables on the
instance
Issue:
`RealtimeTableDataManager.getUpsertPartitionToPrimaryKeyCount()` only
checked `isUpsertEnabled()`. For dedup tables, it always returned an empty
map — even though the dedup metadata manager already tracked PK counts
internally. This caused the metadata API to return an empty PK count map
for dedup tables, and the instance-level API to exclude dedup PKs from
the total.
Fix:
- Renamed `getUpsertPartitionToPrimaryKeyCount()` to
`getPartitionToPrimaryKeyCount()` and added a fallback to check
`isDedupEnabled()` and delegate to `_tableDedupMetadataManager`.
- Renamed internal fields/getters from `upsert`-prefixed to generic names
across `TableMetadataInfo`, `ServerSegmentMetadataReader`,
`TablesResource`,
and `PrimaryKeyCount`. The `@JsonProperty` annotation is explicitly kept
with the old name to preserve the JSON wire format during rolling
upgrades.
Tests:
Manual verification with a cluster running both upsert and dedup tables:
Before fix:
> curl -s http://localhost:7500/tables/dedupMeetupRsvp_REALTIME/metadata
{
"tableName" : "dedupMeetupRsvp_REALTIME",
"diskSizeInBytes" : 0,
"numSegments" : 1,
"numRows" : 0,
"columnLengthMap" : { },
"columnCardinalityMap" : { },
"maxNumMultiValuesMap" : { },
"columnIndexSizeMap" : { },
"upsertPartitionToServerPrimaryKeyCountMap" : { }
}
> curl -s http://localhost:7500/instance/primaryKeyCount
{
"instanceId" : "Server_100.112.214.70_7050",
"numPrimaryKeys" : 63,
"upsertAndDedupTables" : [ "upsertPartialMeetupRsvp_REALTIME",
"dedupMeetupRsvp_REALTIME" ],
"lastUpdatedTimeInEpochMs" : 1771584881254
}
After fix:
> curl -s http://localhost:7500/tables/dedupMeetupRsvp_REALTIME/metadata
{
"tableName" : "dedupMeetupRsvp_REALTIME",
"diskSizeInBytes" : 0,
"numSegments" : 1,
"numRows" : 0,
"columnLengthMap" : { },
"columnCardinalityMap" : { },
"maxNumMultiValuesMap" : { },
"columnIndexSizeMap" : { },
"upsertPartitionToServerPrimaryKeyCountMap" : {
"1" : {
"Server_100.112.214.70_7050" : 35
}
}
}
> curl -s http://localhost:7500/instance/primaryKeyCount
{
"instanceId" : "Server_100.112.214.70_7050",
"numPrimaryKeys" : 89,
"upsertAndDedupTables" : [ "upsertPartialMeetupRsvp_REALTIME",
"dedupMeetupRsvp_REALTIME" ],
"lastUpdatedTimeInEpochMs" : 1771584741720
}
Signed-off-by: Jinesh Parakh <[email protected]>
---
.../common/restlet/resources/TableMetadataInfo.java | 13 ++++++++-----
.../controller/util/ServerSegmentMetadataReader.java | 8 ++++----
.../data/manager/realtime/RealtimeTableDataManager.java | 6 +++++-
.../pinot/server/api/resources/PrimaryKeyCount.java | 12 +-----------
.../pinot/server/api/resources/TablesResource.java | 16 ++++++++--------
.../org/apache/pinot/server/api/PrimaryKeyCountTest.java | 5 +++++
6 files changed, 31 insertions(+), 29 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/TableMetadataInfo.java
b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/TableMetadataInfo.java
index 4a6953ac2c0..21468d7d426 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/TableMetadataInfo.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/TableMetadataInfo.java
@@ -43,7 +43,9 @@ public class TableMetadataInfo {
private final Map<String, Double> _columnCardinalityMap;
private final Map<String, Double> _maxNumMultiValuesMap;
private final Map<String, Map<String, Double>> _columnIndexSizeMap;
- private final Map<Integer, Map<String, Long>>
_upsertPartitionToServerPrimaryKeyCountMap;
+ // JSON property name kept as "upsertPartitionToServerPrimaryKeyCountMap" to
avoid silent data loss during rolling
+ // upgrades where servers and controllers may temporarily run different
versions of this class.
+ private final Map<Integer, Map<String, Long>>
_partitionToServerPrimaryKeyCountMap;
@JsonCreator
public TableMetadataInfo(@JsonProperty("tableName") String tableName,
@@ -53,7 +55,7 @@ public class TableMetadataInfo {
@JsonProperty("maxNumMultiValuesMap") Map<String, Double>
maxNumMultiValuesMap,
@JsonProperty("columnIndexSizeMap") Map<String, Map<String, Double>>
columnIndexSizeMap,
@JsonProperty("upsertPartitionToServerPrimaryKeyCountMap")
- Map<Integer, Map<String, Long>>
upsertPartitionToServerPrimaryKeyCountMap) {
+ Map<Integer, Map<String, Long>> partitionToServerPrimaryKeyCountMap) {
_tableName = tableName;
_diskSizeInBytes = sizeInBytes;
_numSegments = numSegments;
@@ -62,7 +64,7 @@ public class TableMetadataInfo {
_columnCardinalityMap = columnCardinalityMap;
_maxNumMultiValuesMap = maxNumMultiValuesMap;
_columnIndexSizeMap = columnIndexSizeMap;
- _upsertPartitionToServerPrimaryKeyCountMap =
upsertPartitionToServerPrimaryKeyCountMap;
+ _partitionToServerPrimaryKeyCountMap = partitionToServerPrimaryKeyCountMap;
}
public String getTableName() {
@@ -97,7 +99,8 @@ public class TableMetadataInfo {
return _columnIndexSizeMap;
}
- public Map<Integer, Map<String, Long>>
getUpsertPartitionToServerPrimaryKeyCountMap() {
- return _upsertPartitionToServerPrimaryKeyCountMap;
+ @JsonProperty("upsertPartitionToServerPrimaryKeyCountMap")
+ public Map<Integer, Map<String, Long>>
getPartitionToServerPrimaryKeyCountMap() {
+ return _partitionToServerPrimaryKeyCountMap;
}
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
index 53cf628c7fb..a1669a2882b 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
@@ -119,7 +119,7 @@ public class ServerSegmentMetadataReader {
final Map<String, Double> columnCardinalityMap = new HashMap<>();
final Map<String, Double> maxNumMultiValuesMap = new HashMap<>();
final Map<String, Map<String, Double>> columnIndexSizeMap = new
HashMap<>();
- final Map<Integer, Map<String, Long>>
upsertPartitionToServerPrimaryKeyCountMap = new HashMap<>();
+ final Map<Integer, Map<String, Long>> partitionToServerPrimaryKeyCountMap
= new HashMap<>();
for (Map.Entry<String, String> streamResponse :
serviceResponse._httpResponses.entrySet()) {
try {
TableMetadataInfo tableMetadataInfo =
@@ -136,8 +136,8 @@ public class ServerSegmentMetadataReader {
}
return l;
}));
-
tableMetadataInfo.getUpsertPartitionToServerPrimaryKeyCountMap().forEach(
- (partition, serverToPrimaryKeyCount) ->
upsertPartitionToServerPrimaryKeyCountMap.merge(partition,
+ tableMetadataInfo.getPartitionToServerPrimaryKeyCountMap().forEach(
+ (partition, serverToPrimaryKeyCount) ->
partitionToServerPrimaryKeyCountMap.merge(partition,
new HashMap<>(serverToPrimaryKeyCount), (l, r) -> {
for (Map.Entry<String, Long> serverToPKCount : r.entrySet())
{
l.merge(serverToPKCount.getKey(),
serverToPKCount.getValue(), Long::sum);
@@ -167,7 +167,7 @@ public class ServerSegmentMetadataReader {
TableMetadataInfo aggregateTableMetadataInfo =
new TableMetadataInfo(tableNameWithType, totalDiskSizeInBytes,
totalNumSegments, totalNumRows, columnLengthMap,
- columnCardinalityMap, maxNumMultiValuesMap, columnIndexSizeMap,
upsertPartitionToServerPrimaryKeyCountMap);
+ columnCardinalityMap, maxNumMultiValuesMap, columnIndexSizeMap,
partitionToServerPrimaryKeyCountMap);
if (failedParses != 0) {
LOGGER.warn("Failed to parse {} / {} aggregated segment metadata
responses from servers.", failedParses,
serverUrls.size());
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index d6c8407abaf..3bbba6b9579 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -937,13 +937,17 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
/**
* Retrieves a mapping of partition id to the primary key count for the
partition.
+ * Supports both upsert and dedup enabled tables.
*
* @return A {@code Map} where keys are partition id and values are count of
primary keys for that specific partition.
*/
- public Map<Integer, Long> getUpsertPartitionToPrimaryKeyCount() {
+ public Map<Integer, Long> getPartitionToPrimaryKeyCount() {
if (isUpsertEnabled()) {
return _tableUpsertMetadataManager.getPartitionToPrimaryKeyCount();
}
+ if (isDedupEnabled()) {
+ return _tableDedupMetadataManager.getPartitionToPrimaryKeyCount();
+ }
return Collections.emptyMap();
}
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/PrimaryKeyCount.java
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/PrimaryKeyCount.java
index 8886913daec..e45881a5e88 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/PrimaryKeyCount.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/PrimaryKeyCount.java
@@ -62,7 +62,7 @@ public class PrimaryKeyCount {
}
if (tableDataManager instanceof RealtimeTableDataManager) {
Map<Integer, Long> partitionToPrimaryKeyCount =
- getPartitionToPrimaryKeyCount((RealtimeTableDataManager)
tableDataManager);
+ ((RealtimeTableDataManager)
tableDataManager).getPartitionToPrimaryKeyCount();
if (!partitionToPrimaryKeyCount.isEmpty()) {
tablesWithPrimaryKeys.add(tableNameWithType);
@@ -76,14 +76,4 @@ public class PrimaryKeyCount {
return new PrimaryKeyCountInfo(instanceId, totalPrimaryKeyCount,
tablesWithPrimaryKeys, System.currentTimeMillis());
}
-
- private static Map<Integer, Long>
getPartitionToPrimaryKeyCount(RealtimeTableDataManager tableDataManager) {
- // Fetch the primary key count per partition if either upsert or dedup is
enabled
- if (tableDataManager.isUpsertEnabled()) {
- return
tableDataManager.getTableUpsertMetadataManager().getPartitionToPrimaryKeyCount();
- } else if (tableDataManager.isDedupEnabled()) {
- return
tableDataManager.getTableDedupMetadataManager().getPartitionToPrimaryKeyCount();
- }
- return Map.of();
- }
}
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
index 14ecfa698d8..85a842d54e7 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
@@ -296,23 +296,23 @@ public class TablesResource {
}
}
- // fetch partition to primary key count for realtime tables that have
upsert enabled
- Map<Integer, Long> upsertPartitionToPrimaryKeyCountMap = new HashMap<>();
+ // fetch partition to primary key count for realtime tables that have
upsert or dedup enabled
+ Map<Integer, Long> partitionToPrimaryKeyCountMap = new HashMap<>();
if (tableDataManager instanceof RealtimeTableDataManager) {
RealtimeTableDataManager realtimeTableDataManager =
(RealtimeTableDataManager) tableDataManager;
- upsertPartitionToPrimaryKeyCountMap =
realtimeTableDataManager.getUpsertPartitionToPrimaryKeyCount();
+ partitionToPrimaryKeyCountMap =
realtimeTableDataManager.getPartitionToPrimaryKeyCount();
}
- // construct upsertPartitionToServerPrimaryKeyCountMap to populate in
TableMetadataInfo
- Map<Integer, Map<String, Long>> upsertPartitionToServerPrimaryKeyCountMap
= new HashMap<>();
- upsertPartitionToPrimaryKeyCountMap.forEach(
- (partition, primaryKeyCount) ->
upsertPartitionToServerPrimaryKeyCountMap.put(partition,
+ // construct partitionToServerPrimaryKeyCountMap to populate in
TableMetadataInfo
+ Map<Integer, Map<String, Long>> partitionToServerPrimaryKeyCountMap = new
HashMap<>();
+ partitionToPrimaryKeyCountMap.forEach(
+ (partition, primaryKeyCount) ->
partitionToServerPrimaryKeyCountMap.put(partition,
Map.of(instanceDataManager.getInstanceId(), primaryKeyCount)));
TableMetadataInfo tableMetadataInfo =
new TableMetadataInfo(tableDataManager.getTableName(),
totalSegmentSizeBytes, segmentDataManagers.size(),
totalNumRows, columnLengthMap, columnCardinalityMap,
maxNumMultiValuesMap, columnIndexSizesMap,
- upsertPartitionToServerPrimaryKeyCountMap);
+ partitionToServerPrimaryKeyCountMap);
return ResourceUtils.convertToJsonString(tableMetadataInfo);
}
diff --git
a/pinot-server/src/test/java/org/apache/pinot/server/api/PrimaryKeyCountTest.java
b/pinot-server/src/test/java/org/apache/pinot/server/api/PrimaryKeyCountTest.java
index 14ba30e6188..b167dc1d5fa 100644
---
a/pinot-server/src/test/java/org/apache/pinot/server/api/PrimaryKeyCountTest.java
+++
b/pinot-server/src/test/java/org/apache/pinot/server/api/PrimaryKeyCountTest.java
@@ -139,6 +139,7 @@ public class PrimaryKeyCountTest {
TableUpsertMetadataManager tableUpsertMetadataManager =
mock(TableUpsertMetadataManager.class);
when(tableUpsertMetadataManager.getPartitionToPrimaryKeyCount()).thenReturn(partitionToPrimaryKeyCountMap);
when(realtimeTableDataManager.getTableUpsertMetadataManager()).thenReturn(tableUpsertMetadataManager);
+
when(realtimeTableDataManager.getPartitionToPrimaryKeyCount()).thenReturn(partitionToPrimaryKeyCountMap);
// Mock the instance data manager
InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
@@ -186,6 +187,7 @@ public class PrimaryKeyCountTest {
TableDedupMetadataManager tableDedupMetadataManager =
mock(TableDedupMetadataManager.class);
when(tableDedupMetadataManager.getPartitionToPrimaryKeyCount()).thenReturn(partitionToPrimaryKeyCountMap);
when(realtimeTableDataManager.getTableDedupMetadataManager()).thenReturn(tableDedupMetadataManager);
+
when(realtimeTableDataManager.getPartitionToPrimaryKeyCount()).thenReturn(partitionToPrimaryKeyCountMap);
// Mock the instance data manager
InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
@@ -244,11 +246,14 @@ public class PrimaryKeyCountTest {
TableUpsertMetadataManager tableUpsertMetadataManager =
mock(TableUpsertMetadataManager.class);
when(tableUpsertMetadataManager.getPartitionToPrimaryKeyCount()).thenReturn(upsertPartitionToPrimaryKeyCountMap);
when(upsertRealtimeTableDataManager.getTableUpsertMetadataManager()).thenReturn(tableUpsertMetadataManager);
+ when(upsertRealtimeTableDataManager.getPartitionToPrimaryKeyCount())
+ .thenReturn(upsertPartitionToPrimaryKeyCountMap);
// Mock the dedup manager
TableDedupMetadataManager tableDedupMetadataManager =
mock(TableDedupMetadataManager.class);
when(tableDedupMetadataManager.getPartitionToPrimaryKeyCount()).thenReturn(dedupPartitionToPrimaryKeyCountMap);
when(dedupRealtimeTableDataManager.getTableDedupMetadataManager()).thenReturn(tableDedupMetadataManager);
+
when(dedupRealtimeTableDataManager.getPartitionToPrimaryKeyCount()).thenReturn(dedupPartitionToPrimaryKeyCountMap);
// Mock the instance data manager
InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]