This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new aa46eb7775a Add endpoint to find segments with invalid partition
metadata (#18057)
aa46eb7775a is described below
commit aa46eb7775a6032cb4b796a50e960ca95cfe59d0
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Tue Mar 31 19:30:02 2026 -0700
Add endpoint to find segments with invalid partition metadata (#18057)
---
.../api/resources/PinotSegmentRestletResource.java | 144 +++++++++++++++------
.../resources/PinotSegmentRestletResourceTest.java | 85 ++++++++++++
2 files changed, 188 insertions(+), 41 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
index 6355371273c..a8c52298253 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
@@ -70,6 +70,7 @@ import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.exception.InvalidConfigException;
import org.apache.pinot.common.lineage.SegmentLineage;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.utils.DatabaseUtils;
import org.apache.pinot.common.utils.LLCSegmentName;
@@ -87,10 +88,12 @@ import org.apache.pinot.controller.util.TableTierReader;
import org.apache.pinot.core.auth.Actions;
import org.apache.pinot.core.auth.Authorize;
import org.apache.pinot.core.auth.TargetType;
+import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
@@ -166,15 +169,18 @@ import static
org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_K
* </li>
* </ul>
*/
-@Api(tags = Constants.SEGMENT_TAG, authorizations = {@Authorization(value =
SWAGGER_AUTHORIZATION_KEY),
- @Authorization(value = DATABASE)})
+@Api(tags = Constants.SEGMENT_TAG, authorizations = {
+ @Authorization(value = SWAGGER_AUTHORIZATION_KEY),
+ @Authorization(value = DATABASE)
+})
@SwaggerDefinition(securityDefinition =
@SecurityDefinition(apiKeyAuthDefinitions = {
@ApiKeyAuthDefinition(name = HttpHeaders.AUTHORIZATION, in =
ApiKeyAuthDefinition.ApiKeyLocation.HEADER,
key = SWAGGER_AUTHORIZATION_KEY,
description = "The format of the key is ```\"Basic <token>\" or
\"Bearer <token>\"```"),
@ApiKeyAuthDefinition(name = DATABASE, in =
ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = DATABASE,
description = "Database context passed through http header. If no
context is provided 'default' database "
- + "context will be considered.")}))
+ + "context will be considered.")
+}))
@Path("/")
public class PinotSegmentRestletResource {
private static final Logger LOGGER =
LoggerFactory.getLogger(PinotSegmentRestletResource.class);
@@ -372,7 +378,6 @@ public class PinotSegmentRestletResource {
return segmentZKMetadata != null ? segmentZKMetadata.toMap() : null;
}
-
/**
* Helper method to find the existing table based on the given table name
(with or without type suffix) and segment
* name.
@@ -404,10 +409,10 @@ public class PinotSegmentRestletResource {
+ "it again", notes = "Resets a segment by disabling and then
enabling it")
public SuccessResponse resetSegment(
@ApiParam(value = "Name of the table with type", required = true)
@PathParam("tableNameWithType")
- String tableNameWithType,
+ String tableNameWithType,
@ApiParam(value = "Name of the segment", required = true)
@PathParam("segmentName") @Encoded String segmentName,
@ApiParam(value = "Name of the target instance to reset")
@QueryParam("targetInstance") @Nullable
- String targetInstance, @Context HttpHeaders headers) {
+ String targetInstance, @Context HttpHeaders headers) {
tableNameWithType = DatabaseUtils.translateTableName(tableNameWithType,
headers);
segmentName = URIUtils.decode(segmentName);
TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
@@ -444,9 +449,9 @@ public class PinotSegmentRestletResource {
+ " and finally enabling them", notes = "Resets segments by
disabling and then enabling them")
public SuccessResponse resetSegments(
@ApiParam(value = "Name of the table with type", required = true)
@PathParam("tableNameWithType")
- String tableNameWithType,
+ String tableNameWithType,
@ApiParam(value = "Name of the target instance to reset")
@QueryParam("targetInstance") @Nullable
- String targetInstance,
+ String targetInstance,
@ApiParam(value = "Whether to reset only segments with error state")
@QueryParam("errorSegmentsOnly")
@DefaultValue("false") boolean errorSegmentsOnly, @Context HttpHeaders
headers) {
tableNameWithType = DatabaseUtils.translateTableName(tableNameWithType,
headers);
@@ -728,7 +733,6 @@ public class PinotSegmentRestletResource {
return segmentsMetadata;
}
-
@GET
@Path("segments/{tableNameWithType}/isStale")
@Authorize(targetType = TargetType.TABLE, paramName = "tableNameWithType",
action = Actions.Table.GET_METADATA)
@@ -745,7 +749,7 @@ public class PinotSegmentRestletResource {
TableMetadataReader tableMetadataReader =
new TableMetadataReader(_executor, _connectionManager,
_pinotHelixResourceManager);
return tableMetadataReader.getStaleSegments(tableNameWithType,
- _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
+ _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
} catch (InvalidConfigException e) {
throw new ControllerApplicationException(LOGGER, e.getMessage(),
Status.BAD_REQUEST);
} catch (IOException ioe) {
@@ -779,6 +783,64 @@ public class PinotSegmentRestletResource {
return segmentToMetadataMap;
}
+ @GET
+ @Path("segments/{tableName}/invalidPartitionMetadata")
+ @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action =
Actions.Table.GET_METADATA)
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(
+ value = "Get the invalid partition metadata for all table segments",
+ notes = "Get the invalid partition metadata for all table segments"
+ )
+ public Map<String, String> getSegmentsWithInvalidPartitionMetadata(
+ @ApiParam(value = "Name of the table", required = true)
@PathParam("tableName") String tableName,
+ @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") @Nullable
String tableTypeStr,
+ @ApiParam(value = "Partition column") @QueryParam("partitionColumn")
@Nullable String partitionColumn,
+ @Context HttpHeaders headers) {
+ String tableNameWithType =
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager,
+ DatabaseUtils.translateTableName(tableName, headers),
Constants.validateTableType(tableTypeStr), LOGGER).get(0);
+ LOGGER.info("Received a request to fetch invalid partition metadata for
all segments for table {}",
+ tableNameWithType);
+ Map<String, String> segmentsWithInvalidPartitionMetadata = new HashMap<>();
+ List<SegmentZKMetadata> segmentZKMetadataList =
_pinotHelixResourceManager.getSegmentsZKMetadata(tableNameWithType);
+ for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
+ String partitionMetadataJson =
+
segmentZKMetadata.getSimpleFields().get(CommonConstants.Segment.PARTITION_METADATA);
+ if (!isValidPartitionMetadata(partitionMetadataJson, partitionColumn)) {
+
segmentsWithInvalidPartitionMetadata.put(segmentZKMetadata.getSegmentName(),
partitionMetadataJson);
+ }
+ }
+ return segmentsWithInvalidPartitionMetadata;
+ }
+
+ /// When partition column is not specified, returns `true` when all columns
within the metadata contain single
+ /// partition. Treat `null` metadata as valid.
+ /// When partition column is specified, return `true` when the specified
column within the metadata contains single
+ /// partition. Treat `null` metadata as invalid.
+ private boolean isValidPartitionMetadata(@Nullable String
partitionMetadataJson, @Nullable String partitionColumn) {
+ if (StringUtils.isBlank(partitionColumn)) {
+ partitionColumn = null;
+ }
+ if (partitionMetadataJson == null) {
+ return partitionColumn == null;
+ }
+ SegmentPartitionMetadata metadata;
+ try {
+ metadata =
SegmentPartitionMetadata.fromJsonString(partitionMetadataJson);
+ } catch (Exception e) {
+ return false;
+ }
+ if (partitionColumn == null) {
+ for (ColumnPartitionMetadata columnMetadata :
metadata.getColumnPartitionMap().values()) {
+ if (columnMetadata.getPartitions().size() != 1) {
+ return false;
+ }
+ }
+ return true;
+ }
+ Set<Integer> partitions = metadata.getPartitions(partitionColumn);
+ return partitions != null && partitions.size() == 1;
+ }
+
@GET
@Path("segments/{tableName}/tiers")
@Authorize(targetType = TargetType.TABLE, paramName = "tableName", action =
Actions.Table.GET_STORAGE_TIER)
@@ -859,9 +921,9 @@ public class PinotSegmentRestletResource {
@ApiParam(value = "Name of the table", required = true)
@PathParam("tableName") String tableName,
@ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String
tableTypeStr,
@ApiParam(value = "Start timestamp (inclusive) in milliseconds")
@QueryParam("startTimestamp") @DefaultValue("")
- String startTimestampStr,
+ String startTimestampStr,
@ApiParam(value = "End timestamp (exclusive) in milliseconds")
@QueryParam("endTimestamp") @DefaultValue("")
- String endTimestampStr,
+ String endTimestampStr,
@ApiParam(value = "Whether to exclude the segments overlapping with the
timestamps, false by default")
@QueryParam("excludeOverlapping") @DefaultValue("false") boolean
excludeOverlapping) {
long startTimestamp;
@@ -1120,39 +1182,39 @@ public class PinotSegmentRestletResource {
* @return
*/
private SuccessResponse updateZKTimeIntervalInternal(String
tableNameWithType) {
- TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
- if (tableConfig == null) {
- throw new ControllerApplicationException(LOGGER,
- "Failed to find table config for table: " + tableNameWithType,
Status.NOT_FOUND);
- }
+ TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig == null) {
+ throw new ControllerApplicationException(LOGGER,
+ "Failed to find table config for table: " + tableNameWithType,
Status.NOT_FOUND);
+ }
- Schema tableSchema =
_pinotHelixResourceManager.getTableSchema(tableNameWithType);
- if (tableSchema == null) {
- throw new ControllerApplicationException(LOGGER,
- "Failed to find schema for table: " + tableNameWithType,
Status.NOT_FOUND);
- }
+ Schema tableSchema =
_pinotHelixResourceManager.getTableSchema(tableNameWithType);
+ if (tableSchema == null) {
+ throw new ControllerApplicationException(LOGGER,
+ "Failed to find schema for table: " + tableNameWithType,
Status.NOT_FOUND);
+ }
- String timeColumn =
tableConfig.getValidationConfig().getTimeColumnName();
- if (StringUtils.isEmpty(timeColumn)) {
- throw new ControllerApplicationException(LOGGER,
- "Failed to find time column for table : " + tableNameWithType,
Status.NOT_FOUND);
- }
+ String timeColumn = tableConfig.getValidationConfig().getTimeColumnName();
+ if (StringUtils.isEmpty(timeColumn)) {
+ throw new ControllerApplicationException(LOGGER,
+ "Failed to find time column for table : " + tableNameWithType,
Status.NOT_FOUND);
+ }
- DateTimeFieldSpec timeColumnFieldSpec =
tableSchema.getSpecForTimeColumn(timeColumn);
- if (timeColumnFieldSpec == null) {
- throw new ControllerApplicationException(LOGGER,
- String.format("Failed to find field spec for column: %s and table:
%s", timeColumn, tableNameWithType),
- Status.NOT_FOUND);
- }
+ DateTimeFieldSpec timeColumnFieldSpec =
tableSchema.getSpecForTimeColumn(timeColumn);
+ if (timeColumnFieldSpec == null) {
+ throw new ControllerApplicationException(LOGGER,
+ String.format("Failed to find field spec for column: %s and table:
%s", timeColumn, tableNameWithType),
+ Status.NOT_FOUND);
+ }
- try {
-
_pinotHelixResourceManager.updateSegmentsZKTimeInterval(tableNameWithType,
timeColumnFieldSpec);
- } catch (Exception e) {
- throw new ControllerApplicationException(LOGGER,
- String.format("Failed to update time interval zk metadata for
table %s", tableNameWithType),
- Status.INTERNAL_SERVER_ERROR, e);
- }
- return new SuccessResponse("Successfully updated time interval zk
metadata for table: " + tableNameWithType);
+ try {
+
_pinotHelixResourceManager.updateSegmentsZKTimeInterval(tableNameWithType,
timeColumnFieldSpec);
+ } catch (Exception e) {
+ throw new ControllerApplicationException(LOGGER,
+ String.format("Failed to update time interval zk metadata for table
%s", tableNameWithType),
+ Status.INTERNAL_SERVER_ERROR, e);
+ }
+ return new SuccessResponse("Successfully updated time interval zk metadata
for table: " + tableNameWithType);
}
private List<Pair<TableType, List<String>>> selectSegments(
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResourceTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResourceTest.java
index 97f14aa2ee7..8cc2a397c27 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResourceTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResourceTest.java
@@ -27,19 +27,30 @@ import java.util.Set;
import java.util.stream.Collectors;
import org.apache.helix.model.IdealState;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.exception.TableNotFoundException;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.spi.utils.CommonConstants;
import org.mockito.InjectMocks;
+import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
public class PinotSegmentRestletResourceTest {
+ @Mock
+ PinotHelixResourceManager _pinotHelixResourceManager;
+
@InjectMocks
PinotSegmentRestletResource _pinotSegmentRestletResource;
@@ -132,6 +143,80 @@ public class PinotSegmentRestletResourceTest {
assertEquals(expectedResult, result);
}
+ @Test
+ public void testGetSegmentsWithInvalidPartitionMetadata()
+ throws TableNotFoundException {
+ String tableName = "testTable";
+ String tableNameWithType = "testTable_OFFLINE";
+
when(_pinotHelixResourceManager.getExistingTableNamesWithType(eq(tableName),
any()))
+ .thenReturn(List.of(tableNameWithType));
+
+ // seg0: null partition metadata, no column filter → valid
+ SegmentZKMetadata seg0 = new SegmentZKMetadata("seg0");
+
+ // seg1: single partition per column, no column filter → valid
+ SegmentZKMetadata seg1 = new SegmentZKMetadata("seg1");
+ seg1.getSimpleFields().put(CommonConstants.Segment.PARTITION_METADATA,
+
"{\"columnPartitionMap\":{\"col\":{\"functionName\":\"Modulo\",\"numPartitions\":4,\"partitions\":[0]}}}");
+
+ // seg2: multiple partitions for a column, no column filter → invalid
+ SegmentZKMetadata seg2 = new SegmentZKMetadata("seg2");
+ seg2.getSimpleFields().put(CommonConstants.Segment.PARTITION_METADATA,
+
"{\"columnPartitionMap\":{\"col\":{\"functionName\":\"Modulo\",\"numPartitions\":4,\"partitions\":[0,1]}}}");
+
+ // seg3: malformed JSON → invalid
+ SegmentZKMetadata seg3 = new SegmentZKMetadata("seg3");
+ seg3.getSimpleFields().put(CommonConstants.Segment.PARTITION_METADATA,
"not-valid-json");
+
+ when(_pinotHelixResourceManager.getSegmentsZKMetadata(tableNameWithType))
+ .thenReturn(List.of(seg0, seg1, seg2, seg3));
+
+ // no column filter: seg0 (null) is valid, seg1 is valid, seg2
(multi-partition) and seg3 (malformed) are invalid
+ Map<String, String> result =
+
_pinotSegmentRestletResource.getSegmentsWithInvalidPartitionMetadata(tableName,
"OFFLINE", null, null);
+ assertEquals(result.size(), 2);
+ assertTrue(result.containsKey("seg2"));
+ assertTrue(result.containsKey("seg3"));
+ }
+
+ @Test
+ public void testGetSegmentsWithInvalidPartitionMetadataWithColumnFilter()
+ throws TableNotFoundException {
+ String tableName = "testTable2";
+ String tableNameWithType = "testTable2_OFFLINE";
+
when(_pinotHelixResourceManager.getExistingTableNamesWithType(eq(tableName),
any()))
+ .thenReturn(List.of(tableNameWithType));
+
+ // seg0: null partition metadata, column specified → invalid
+ SegmentZKMetadata seg0 = new SegmentZKMetadata("seg0");
+
+ // seg1: single partition for the specified column → valid
+ SegmentZKMetadata seg1 = new SegmentZKMetadata("seg1");
+ seg1.getSimpleFields().put(CommonConstants.Segment.PARTITION_METADATA,
+
"{\"columnPartitionMap\":{\"col\":{\"functionName\":\"Modulo\",\"numPartitions\":4,\"partitions\":[2]}}}");
+
+ // seg2: multiple partitions for the specified column → invalid
+ SegmentZKMetadata seg2 = new SegmentZKMetadata("seg2");
+ seg2.getSimpleFields().put(CommonConstants.Segment.PARTITION_METADATA,
+
"{\"columnPartitionMap\":{\"col\":{\"functionName\":\"Modulo\",\"numPartitions\":4,\"partitions\":[0,1]}}}");
+
+ // seg3: specified column not present in partition metadata → invalid
+ SegmentZKMetadata seg3 = new SegmentZKMetadata("seg3");
+ seg3.getSimpleFields().put(CommonConstants.Segment.PARTITION_METADATA,
+
"{\"columnPartitionMap\":{\"otherCol\":{\"functionName\":\"Modulo\",\"numPartitions\":4,\"partitions\":[0]}}}");
+
+ when(_pinotHelixResourceManager.getSegmentsZKMetadata(tableNameWithType))
+ .thenReturn(List.of(seg0, seg1, seg2, seg3));
+
+ // with column filter "col": seg0 (null metadata), seg2 (multi-partition),
seg3 (column absent) are invalid
+ Map<String, String> result =
+
_pinotSegmentRestletResource.getSegmentsWithInvalidPartitionMetadata(tableName,
"OFFLINE", "col", null);
+ assertEquals(result.size(), 3);
+ assertTrue(result.containsKey("seg0"));
+ assertTrue(result.containsKey("seg2"));
+ assertTrue(result.containsKey("seg3"));
+ }
+
private List<String> getSegmentForPartition(String tableName, int
partitionID, int sequenceNumberOffset,
int numberOfSegments, long currentTime) {
List<String> segments = new ArrayList<>();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]