This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new aa46eb7775a Add endpoint to find segments with invalid partition 
metadata (#18057)
aa46eb7775a is described below

commit aa46eb7775a6032cb4b796a50e960ca95cfe59d0
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Tue Mar 31 19:30:02 2026 -0700

    Add endpoint to find segments with invalid partition metadata (#18057)
---
 .../api/resources/PinotSegmentRestletResource.java | 144 +++++++++++++++------
 .../resources/PinotSegmentRestletResourceTest.java |  85 ++++++++++++
 2 files changed, 188 insertions(+), 41 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
index 6355371273c..a8c52298253 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
@@ -70,6 +70,7 @@ import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.common.exception.InvalidConfigException;
 import org.apache.pinot.common.lineage.SegmentLineage;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.utils.DatabaseUtils;
 import org.apache.pinot.common.utils.LLCSegmentName;
@@ -87,10 +88,12 @@ import org.apache.pinot.controller.util.TableTierReader;
 import org.apache.pinot.core.auth.Actions;
 import org.apache.pinot.core.auth.Authorize;
 import org.apache.pinot.core.auth.TargetType;
+import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.DateTimeFieldSpec;
 import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
@@ -166,15 +169,18 @@ import static 
org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_K
  *   </li>
  * </ul>
  */
-@Api(tags = Constants.SEGMENT_TAG, authorizations = {@Authorization(value = 
SWAGGER_AUTHORIZATION_KEY),
-    @Authorization(value = DATABASE)})
+@Api(tags = Constants.SEGMENT_TAG, authorizations = {
+    @Authorization(value = SWAGGER_AUTHORIZATION_KEY),
+    @Authorization(value = DATABASE)
+})
 @SwaggerDefinition(securityDefinition = 
@SecurityDefinition(apiKeyAuthDefinitions = {
     @ApiKeyAuthDefinition(name = HttpHeaders.AUTHORIZATION, in = 
ApiKeyAuthDefinition.ApiKeyLocation.HEADER,
         key = SWAGGER_AUTHORIZATION_KEY,
         description = "The format of the key is  ```\"Basic <token>\" or 
\"Bearer <token>\"```"),
     @ApiKeyAuthDefinition(name = DATABASE, in = 
ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = DATABASE,
         description = "Database context passed through http header. If no 
context is provided 'default' database "
-            + "context will be considered.")}))
+            + "context will be considered.")
+}))
 @Path("/")
 public class PinotSegmentRestletResource {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotSegmentRestletResource.class);
@@ -372,7 +378,6 @@ public class PinotSegmentRestletResource {
     return segmentZKMetadata != null ? segmentZKMetadata.toMap() : null;
   }
 
-
   /**
    * Helper method to find the existing table based on the given table name 
(with or without type suffix) and segment
    * name.
@@ -404,10 +409,10 @@ public class PinotSegmentRestletResource {
           + "it again", notes = "Resets a segment by disabling and then 
enabling it")
   public SuccessResponse resetSegment(
       @ApiParam(value = "Name of the table with type", required = true) 
@PathParam("tableNameWithType")
-          String tableNameWithType,
+      String tableNameWithType,
       @ApiParam(value = "Name of the segment", required = true) 
@PathParam("segmentName") @Encoded String segmentName,
       @ApiParam(value = "Name of the target instance to reset") 
@QueryParam("targetInstance") @Nullable
-          String targetInstance, @Context HttpHeaders headers) {
+      String targetInstance, @Context HttpHeaders headers) {
     tableNameWithType = DatabaseUtils.translateTableName(tableNameWithType, 
headers);
     segmentName = URIUtils.decode(segmentName);
     TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
@@ -444,9 +449,9 @@ public class PinotSegmentRestletResource {
           + " and finally enabling them", notes = "Resets segments by 
disabling and then enabling them")
   public SuccessResponse resetSegments(
       @ApiParam(value = "Name of the table with type", required = true) 
@PathParam("tableNameWithType")
-          String tableNameWithType,
+      String tableNameWithType,
       @ApiParam(value = "Name of the target instance to reset") 
@QueryParam("targetInstance") @Nullable
-          String targetInstance,
+      String targetInstance,
       @ApiParam(value = "Whether to reset only segments with error state") 
@QueryParam("errorSegmentsOnly")
       @DefaultValue("false") boolean errorSegmentsOnly, @Context HttpHeaders 
headers) {
     tableNameWithType = DatabaseUtils.translateTableName(tableNameWithType, 
headers);
@@ -728,7 +733,6 @@ public class PinotSegmentRestletResource {
     return segmentsMetadata;
   }
 
-
   @GET
   @Path("segments/{tableNameWithType}/isStale")
   @Authorize(targetType = TargetType.TABLE, paramName = "tableNameWithType", 
action = Actions.Table.GET_METADATA)
@@ -745,7 +749,7 @@ public class PinotSegmentRestletResource {
       TableMetadataReader tableMetadataReader =
           new TableMetadataReader(_executor, _connectionManager, 
_pinotHelixResourceManager);
       return tableMetadataReader.getStaleSegments(tableNameWithType,
-              _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
+          _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
     } catch (InvalidConfigException e) {
       throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Status.BAD_REQUEST);
     } catch (IOException ioe) {
@@ -779,6 +783,64 @@ public class PinotSegmentRestletResource {
     return segmentToMetadataMap;
   }
 
+  @GET
+  @Path("segments/{tableName}/invalidPartitionMetadata")
+  @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = 
Actions.Table.GET_METADATA)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(
+      value = "Get the invalid partition metadata for all table segments",
+      notes = "Get the invalid partition metadata for all table segments"
+  )
+  public Map<String, String> getSegmentsWithInvalidPartitionMetadata(
+      @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName,
+      @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") @Nullable 
String tableTypeStr,
+      @ApiParam(value = "Partition column") @QueryParam("partitionColumn") 
@Nullable String partitionColumn,
+      @Context HttpHeaders headers) {
+    String tableNameWithType = 
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager,
+        DatabaseUtils.translateTableName(tableName, headers), 
Constants.validateTableType(tableTypeStr), LOGGER).get(0);
+    LOGGER.info("Received a request to fetch invalid partition metadata for 
all segments for table {}",
+        tableNameWithType);
+    Map<String, String> segmentsWithInvalidPartitionMetadata = new HashMap<>();
+    List<SegmentZKMetadata> segmentZKMetadataList = 
_pinotHelixResourceManager.getSegmentsZKMetadata(tableNameWithType);
+    for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
+      String partitionMetadataJson =
+          
segmentZKMetadata.getSimpleFields().get(CommonConstants.Segment.PARTITION_METADATA);
+      if (!isValidPartitionMetadata(partitionMetadataJson, partitionColumn)) {
+        
segmentsWithInvalidPartitionMetadata.put(segmentZKMetadata.getSegmentName(), 
partitionMetadataJson);
+      }
+    }
+    return segmentsWithInvalidPartitionMetadata;
+  }
+
+  /// When partition column is not specified, returns `true` when all columns 
within the metadata contain single
+  /// partition. Treat `null` metadata as valid.
+  /// When partition column is specified, return `true` when the specified 
column within the metadata contains single
+  /// partition. Treat `null` metadata as invalid.
+  private boolean isValidPartitionMetadata(@Nullable String 
partitionMetadataJson, @Nullable String partitionColumn) {
+    if (StringUtils.isBlank(partitionColumn)) {
+      partitionColumn = null;
+    }
+    if (partitionMetadataJson == null) {
+      return partitionColumn == null;
+    }
+    SegmentPartitionMetadata metadata;
+    try {
+      metadata = 
SegmentPartitionMetadata.fromJsonString(partitionMetadataJson);
+    } catch (Exception e) {
+      return false;
+    }
+    if (partitionColumn == null) {
+      for (ColumnPartitionMetadata columnMetadata : 
metadata.getColumnPartitionMap().values()) {
+        if (columnMetadata.getPartitions().size() != 1) {
+          return false;
+        }
+      }
+      return true;
+    }
+    Set<Integer> partitions = metadata.getPartitions(partitionColumn);
+    return partitions != null && partitions.size() == 1;
+  }
+
   @GET
   @Path("segments/{tableName}/tiers")
   @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = 
Actions.Table.GET_STORAGE_TIER)
@@ -859,9 +921,9 @@ public class PinotSegmentRestletResource {
       @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName,
       @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String 
tableTypeStr,
       @ApiParam(value = "Start timestamp (inclusive) in milliseconds") 
@QueryParam("startTimestamp") @DefaultValue("")
-          String startTimestampStr,
+      String startTimestampStr,
       @ApiParam(value = "End timestamp (exclusive) in milliseconds") 
@QueryParam("endTimestamp") @DefaultValue("")
-          String endTimestampStr,
+      String endTimestampStr,
       @ApiParam(value = "Whether to exclude the segments overlapping with the 
timestamps, false by default")
       @QueryParam("excludeOverlapping") @DefaultValue("false") boolean 
excludeOverlapping) {
     long startTimestamp;
@@ -1120,39 +1182,39 @@ public class PinotSegmentRestletResource {
    * @return
    */
   private SuccessResponse updateZKTimeIntervalInternal(String 
tableNameWithType) {
-      TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
-      if (tableConfig == null) {
-        throw new ControllerApplicationException(LOGGER,
-            "Failed to find table config for table: " + tableNameWithType, 
Status.NOT_FOUND);
-      }
+    TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+    if (tableConfig == null) {
+      throw new ControllerApplicationException(LOGGER,
+          "Failed to find table config for table: " + tableNameWithType, 
Status.NOT_FOUND);
+    }
 
-      Schema tableSchema = 
_pinotHelixResourceManager.getTableSchema(tableNameWithType);
-      if (tableSchema == null) {
-        throw new ControllerApplicationException(LOGGER,
-            "Failed to find schema for table: " + tableNameWithType, 
Status.NOT_FOUND);
-      }
+    Schema tableSchema = 
_pinotHelixResourceManager.getTableSchema(tableNameWithType);
+    if (tableSchema == null) {
+      throw new ControllerApplicationException(LOGGER,
+          "Failed to find schema for table: " + tableNameWithType, 
Status.NOT_FOUND);
+    }
 
-      String timeColumn = 
tableConfig.getValidationConfig().getTimeColumnName();
-      if (StringUtils.isEmpty(timeColumn)) {
-        throw new ControllerApplicationException(LOGGER,
-            "Failed to find time column for table : " + tableNameWithType, 
Status.NOT_FOUND);
-      }
+    String timeColumn = tableConfig.getValidationConfig().getTimeColumnName();
+    if (StringUtils.isEmpty(timeColumn)) {
+      throw new ControllerApplicationException(LOGGER,
+          "Failed to find time column for table : " + tableNameWithType, 
Status.NOT_FOUND);
+    }
 
-      DateTimeFieldSpec timeColumnFieldSpec = 
tableSchema.getSpecForTimeColumn(timeColumn);
-      if (timeColumnFieldSpec == null) {
-        throw new ControllerApplicationException(LOGGER,
-            String.format("Failed to find field spec for column: %s and table: 
%s", timeColumn, tableNameWithType),
-            Status.NOT_FOUND);
-      }
+    DateTimeFieldSpec timeColumnFieldSpec = 
tableSchema.getSpecForTimeColumn(timeColumn);
+    if (timeColumnFieldSpec == null) {
+      throw new ControllerApplicationException(LOGGER,
+          String.format("Failed to find field spec for column: %s and table: 
%s", timeColumn, tableNameWithType),
+          Status.NOT_FOUND);
+    }
 
-      try {
-        
_pinotHelixResourceManager.updateSegmentsZKTimeInterval(tableNameWithType, 
timeColumnFieldSpec);
-      } catch (Exception e) {
-        throw new ControllerApplicationException(LOGGER,
-            String.format("Failed to update time interval zk metadata for 
table %s", tableNameWithType),
-            Status.INTERNAL_SERVER_ERROR, e);
-      }
-      return new SuccessResponse("Successfully updated time interval zk 
metadata for table: " + tableNameWithType);
+    try {
+      
_pinotHelixResourceManager.updateSegmentsZKTimeInterval(tableNameWithType, 
timeColumnFieldSpec);
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER,
+          String.format("Failed to update time interval zk metadata for table 
%s", tableNameWithType),
+          Status.INTERNAL_SERVER_ERROR, e);
+    }
+    return new SuccessResponse("Successfully updated time interval zk metadata 
for table: " + tableNameWithType);
   }
 
   private List<Pair<TableType, List<String>>> selectSegments(
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResourceTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResourceTest.java
index 97f14aa2ee7..8cc2a397c27 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResourceTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResourceTest.java
@@ -27,19 +27,30 @@ import java.util.Set;
 import java.util.stream.Collectors;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.exception.TableNotFoundException;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.mockito.InjectMocks;
+import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
 
 
 public class PinotSegmentRestletResourceTest {
 
+  @Mock
+  PinotHelixResourceManager _pinotHelixResourceManager;
+
   @InjectMocks
   PinotSegmentRestletResource _pinotSegmentRestletResource;
 
@@ -132,6 +143,80 @@ public class PinotSegmentRestletResourceTest {
     assertEquals(expectedResult, result);
   }
 
+  @Test
+  public void testGetSegmentsWithInvalidPartitionMetadata()
+      throws TableNotFoundException {
+    String tableName = "testTable";
+    String tableNameWithType = "testTable_OFFLINE";
+    
when(_pinotHelixResourceManager.getExistingTableNamesWithType(eq(tableName), 
any()))
+        .thenReturn(List.of(tableNameWithType));
+
+    // seg0: null partition metadata, no column filter → valid
+    SegmentZKMetadata seg0 = new SegmentZKMetadata("seg0");
+
+    // seg1: single partition per column, no column filter → valid
+    SegmentZKMetadata seg1 = new SegmentZKMetadata("seg1");
+    seg1.getSimpleFields().put(CommonConstants.Segment.PARTITION_METADATA,
+        
"{\"columnPartitionMap\":{\"col\":{\"functionName\":\"Modulo\",\"numPartitions\":4,\"partitions\":[0]}}}");
+
+    // seg2: multiple partitions for a column, no column filter → invalid
+    SegmentZKMetadata seg2 = new SegmentZKMetadata("seg2");
+    seg2.getSimpleFields().put(CommonConstants.Segment.PARTITION_METADATA,
+        
"{\"columnPartitionMap\":{\"col\":{\"functionName\":\"Modulo\",\"numPartitions\":4,\"partitions\":[0,1]}}}");
+
+    // seg3: malformed JSON → invalid
+    SegmentZKMetadata seg3 = new SegmentZKMetadata("seg3");
+    seg3.getSimpleFields().put(CommonConstants.Segment.PARTITION_METADATA, 
"not-valid-json");
+
+    when(_pinotHelixResourceManager.getSegmentsZKMetadata(tableNameWithType))
+        .thenReturn(List.of(seg0, seg1, seg2, seg3));
+
+    // no column filter: seg0 (null) is valid, seg1 is valid, seg2 
(multi-partition) and seg3 (malformed) are invalid
+    Map<String, String> result =
+        
_pinotSegmentRestletResource.getSegmentsWithInvalidPartitionMetadata(tableName, 
"OFFLINE", null, null);
+    assertEquals(result.size(), 2);
+    assertTrue(result.containsKey("seg2"));
+    assertTrue(result.containsKey("seg3"));
+  }
+
+  @Test
+  public void testGetSegmentsWithInvalidPartitionMetadataWithColumnFilter()
+      throws TableNotFoundException {
+    String tableName = "testTable2";
+    String tableNameWithType = "testTable2_OFFLINE";
+    
when(_pinotHelixResourceManager.getExistingTableNamesWithType(eq(tableName), 
any()))
+        .thenReturn(List.of(tableNameWithType));
+
+    // seg0: null partition metadata, column specified → invalid
+    SegmentZKMetadata seg0 = new SegmentZKMetadata("seg0");
+
+    // seg1: single partition for the specified column → valid
+    SegmentZKMetadata seg1 = new SegmentZKMetadata("seg1");
+    seg1.getSimpleFields().put(CommonConstants.Segment.PARTITION_METADATA,
+        
"{\"columnPartitionMap\":{\"col\":{\"functionName\":\"Modulo\",\"numPartitions\":4,\"partitions\":[2]}}}");
+
+    // seg2: multiple partitions for the specified column → invalid
+    SegmentZKMetadata seg2 = new SegmentZKMetadata("seg2");
+    seg2.getSimpleFields().put(CommonConstants.Segment.PARTITION_METADATA,
+        
"{\"columnPartitionMap\":{\"col\":{\"functionName\":\"Modulo\",\"numPartitions\":4,\"partitions\":[0,1]}}}");
+
+    // seg3: specified column not present in partition metadata → invalid
+    SegmentZKMetadata seg3 = new SegmentZKMetadata("seg3");
+    seg3.getSimpleFields().put(CommonConstants.Segment.PARTITION_METADATA,
+        
"{\"columnPartitionMap\":{\"otherCol\":{\"functionName\":\"Modulo\",\"numPartitions\":4,\"partitions\":[0]}}}");
+
+    when(_pinotHelixResourceManager.getSegmentsZKMetadata(tableNameWithType))
+        .thenReturn(List.of(seg0, seg1, seg2, seg3));
+
+    // with column filter "col": seg0 (null metadata), seg2 (multi-partition), 
seg3 (column absent) are invalid
+    Map<String, String> result =
+        
_pinotSegmentRestletResource.getSegmentsWithInvalidPartitionMetadata(tableName, 
"OFFLINE", "col", null);
+    assertEquals(result.size(), 3);
+    assertTrue(result.containsKey("seg0"));
+    assertTrue(result.containsKey("seg2"));
+    assertTrue(result.containsKey("seg3"));
+  }
+
   private List<String> getSegmentForPartition(String tableName, int 
partitionID, int sequenceNumberOffset,
       int numberOfSegments, long currentTime) {
     List<String> segments = new ArrayList<>();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to