This is an automated email from the ASF dual-hosted git repository.

xiangfu0 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 2de43845254 Validate offline upsert segment partitions on upload 
(#18549)
2de43845254 is described below

commit 2de43845254a9d20bb27a7c249c2e9c6154a3145
Author: Xiang Fu <[email protected]>
AuthorDate: Sun May 24 08:49:31 2026 +0800

    Validate offline upsert segment partitions on upload (#18549)
    
    Offline upsert relies on segment partition metadata to route uploaded 
segments into the correct upsert partition. Reject segments whose partition 
metadata is missing, spans multiple partition ids, falls outside the configured 
partition range, or does not match the table partition function identity, 
including the partition id normalizer, so bad batch uploads cannot silently 
corrupt upsert correctness.
    
    Also preserve batch metadata manifest order during validation and document 
the offline upsert batch-upload contract with a sample table config and 
metadata example.
---
 .../PinotSegmentUploadDownloadRestletResource.java |   2 +
 .../api/upload/SegmentValidationUtils.java         |  75 +++++++++
 .../api/upload/SegmentValidationUtilsTest.java     | 176 +++++++++++++++++++++
 3 files changed, 253 insertions(+)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
index d57c7926b77..af77b6ab580 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
@@ -400,6 +400,7 @@ public class PinotSegmentUploadDownloadRestletResource {
       if (tableConfig.getIngestionConfig() == null || 
tableConfig.getIngestionConfig().isSegmentTimeValueCheck()) {
         SegmentValidationUtils.validateTimeInterval(segmentMetadata, 
tableConfig);
       }
+      
SegmentValidationUtils.validateUpsertSegmentPartitionMetadata(segmentMetadata, 
tableConfig);
       long untarredSegmentSizeInBytes;
       if (uploadType == FileUploadType.METADATA && segmentSizeInBytes > 0) {
         // TODO: Include the untarred segment size when using the METADATA 
push rest API. Currently we can only use the
@@ -655,6 +656,7 @@ public class PinotSegmentUploadDownloadRestletResource {
         if (tableConfig.getIngestionConfig() == null || 
tableConfig.getIngestionConfig().isSegmentTimeValueCheck()) {
           SegmentValidationUtils.validateTimeInterval(segmentMetadata, 
tableConfig);
         }
+        
SegmentValidationUtils.validateUpsertSegmentPartitionMetadata(segmentMetadata, 
tableConfig);
         // TODO: Include the un-tarred segment size when using the METADATA 
push rest API. Currently we can only use the
         //  tarred segment size as an approximation.
         long segmentSizeInBytes = getSegmentSizeFromFile(sourceDownloadURIStr);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidationUtils.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidationUtils.java
index 30f25fd8535..d09aefe5d4d 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidationUtils.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidationUtils.java
@@ -18,11 +18,24 @@
  */
 package org.apache.pinot.controller.api.upload;
 
+import java.util.Map;
+import java.util.Set;
 import javax.ws.rs.core.Response;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang3.StringUtils;
 import 
org.apache.pinot.controller.api.exception.ControllerApplicationException;
 import org.apache.pinot.controller.validation.StorageQuotaChecker;
+import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.spi.config.table.IndexingConfig;
+import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig;
+import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
+import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig;
+import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
+import 
org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
 import org.apache.pinot.spi.utils.TimeUtils;
 import org.joda.time.Interval;
 import org.slf4j.Logger;
@@ -57,6 +70,68 @@ public class SegmentValidationUtils {
     }
   }
 
+  public static void validateUpsertSegmentPartitionMetadata(SegmentMetadata 
segmentMetadata, TableConfig tableConfig) {
+    // Scope this upload guard to offline upsert. Realtime upsert upload flows 
need CONSUMING/COMPLETED
+    // partition-column resolution, so avoid applying the OFFLINE lookup below 
to realtime tables.
+    if (tableConfig.getTableType() != TableType.OFFLINE || 
!tableConfig.isUpsertEnabled()) {
+      return;
+    }
+
+    String partitionColumn = getPartitionColumn(tableConfig);
+    if (StringUtils.isEmpty(partitionColumn)) {
+      return;
+    }
+
+    ColumnMetadata columnMetadata = 
segmentMetadata.getColumnMetadataFor(partitionColumn);
+    Set<Integer> partitions = columnMetadata != null ? 
columnMetadata.getPartitions() : null;
+    if (partitions == null || partitions.size() != 1) {
+      throw new ControllerApplicationException(LOGGER,
+          "Uploaded segment: " + segmentMetadata.getName() + " for offline 
upsert table: "
+              + tableConfig.getTableName() + " must contain exactly one 
partition id for column: " + partitionColumn
+              + ", got: " + partitions, Response.Status.BAD_REQUEST);
+    }
+  }
+
+  private static String getPartitionColumn(TableConfig tableConfig) {
+    if (!MapUtils.isEmpty(tableConfig.getInstanceAssignmentConfigMap())) {
+      InstanceAssignmentConfig instanceAssignmentConfig =
+          
tableConfig.getInstanceAssignmentConfigMap().get(InstancePartitionsType.OFFLINE.name());
+      String partitionColumn = getPartitionColumn(instanceAssignmentConfig);
+      if (StringUtils.isNotEmpty(partitionColumn)) {
+        return partitionColumn;
+      }
+    }
+
+    //noinspection deprecation
+    SegmentsValidationAndRetentionConfig validationConfig = 
tableConfig.getValidationConfig();
+    ReplicaGroupStrategyConfig replicaGroupStrategyConfig =
+        validationConfig != null ? 
validationConfig.getReplicaGroupStrategyConfig() : null;
+    String partitionColumn =
+        replicaGroupStrategyConfig != null ? 
replicaGroupStrategyConfig.getPartitionColumn() : null;
+    if (StringUtils.isNotEmpty(partitionColumn)) {
+      return partitionColumn;
+    }
+
+    IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
+    SegmentPartitionConfig segmentPartitionConfig =
+        indexingConfig != null ? indexingConfig.getSegmentPartitionConfig() : 
null;
+    Map<String, ?> columnPartitionMap =
+        segmentPartitionConfig != null ? 
segmentPartitionConfig.getColumnPartitionMap() : null;
+    if (MapUtils.isNotEmpty(columnPartitionMap) && columnPartitionMap.size() 
== 1) {
+      return columnPartitionMap.keySet().iterator().next();
+    }
+    return null;
+  }
+
+  private static String getPartitionColumn(InstanceAssignmentConfig 
instanceAssignmentConfig) {
+    if (instanceAssignmentConfig == null) {
+      return null;
+    }
+    InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig =
+        instanceAssignmentConfig.getReplicaGroupPartitionConfig();
+    return replicaGroupPartitionConfig != null ? 
replicaGroupPartitionConfig.getPartitionColumn() : null;
+  }
+
   public static void checkStorageQuota(String segmentName, long 
tarSegmentSizeInBytes, long untarredSegmentSizeInBytes,
       TableConfig tableConfig,
       StorageQuotaChecker quotaChecker) {
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/SegmentValidationUtilsTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/SegmentValidationUtilsTest.java
new file mode 100644
index 00000000000..c0e320cf62a
--- /dev/null
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/SegmentValidationUtilsTest.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.upload;
+
+import java.util.Map;
+import java.util.Set;
+import javax.ws.rs.core.Response;
+import 
org.apache.pinot.controller.api.exception.ControllerApplicationException;
+import org.apache.pinot.segment.spi.ColumnMetadata;
+import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig;
+import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig;
+import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
+import 
org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
+import org.apache.pinot.spi.config.table.assignment.InstanceTagPoolConfig;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.expectThrows;
+
+
+public class SegmentValidationUtilsTest {
+  private static final String RAW_TABLE_NAME = "testTable";
+  private static final String SEGMENT_NAME = "testSegment";
+  private static final String PARTITION_COLUMN = "pk";
+  private static final int NUM_PARTITIONS = 4;
+
+  @Test
+  public void 
testValidateUpsertSegmentPartitionMetadataAcceptsSinglePartitionId() {
+    
SegmentValidationUtils.validateUpsertSegmentPartitionMetadata(mockSegmentMetadata(Set.of(2)),
+        getOfflineUpsertTableConfig());
+  }
+
+  @Test
+  public void 
testValidateUpsertSegmentPartitionMetadataAcceptsInstanceAssignmentPartitionColumn()
 {
+    InstanceAssignmentConfig instanceAssignmentConfig =
+        new InstanceAssignmentConfig(new 
InstanceTagPoolConfig("DefaultTenant", false, 0, null), null,
+            new InstanceReplicaGroupPartitionConfig(true, 0, 0, 0, 1, 0, 
false, PARTITION_COLUMN), null, false);
+    TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE)
+        .setTableName(RAW_TABLE_NAME)
+        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL))
+        
.setInstanceAssignmentConfigMap(Map.of(InstancePartitionsType.OFFLINE.name(), 
instanceAssignmentConfig))
+        .build();
+
+    
SegmentValidationUtils.validateUpsertSegmentPartitionMetadata(mockSegmentMetadata(Set.of(2)),
 tableConfig);
+  }
+
+  @Test
+  public void 
testValidateUpsertSegmentPartitionMetadataAcceptsSingleSegmentPartitionConfigColumn()
 {
+    
SegmentValidationUtils.validateUpsertSegmentPartitionMetadata(mockSegmentMetadata(Set.of(2)),
+        getOfflineUpsertTableConfigWithSegmentPartitionConfig());
+  }
+
+  @Test
+  public void 
testValidateUpsertSegmentPartitionMetadataRejectsMultipleIdsWithSingleSegmentPartitionConfigColumn()
 {
+    ControllerApplicationException exception = 
expectThrows(ControllerApplicationException.class,
+        () -> 
SegmentValidationUtils.validateUpsertSegmentPartitionMetadata(mockSegmentMetadata(Set.of(1,
 2)),
+            getOfflineUpsertTableConfigWithSegmentPartitionConfig()));
+    assertEquals(exception.getResponse().getStatus(), 
Response.Status.BAD_REQUEST.getStatusCode());
+  }
+
+  @Test
+  public void 
testValidateUpsertSegmentPartitionMetadataRejectsMultiplePartitionIds() {
+    ControllerApplicationException exception = 
expectThrows(ControllerApplicationException.class,
+        () -> 
SegmentValidationUtils.validateUpsertSegmentPartitionMetadata(mockSegmentMetadata(Set.of(1,
 2)),
+            getOfflineUpsertTableConfig()));
+    assertEquals(exception.getResponse().getStatus(), 
Response.Status.BAD_REQUEST.getStatusCode());
+  }
+
+  @Test
+  public void 
testValidateUpsertSegmentPartitionMetadataRejectsMissingPartitionId() {
+    ControllerApplicationException exception = 
expectThrows(ControllerApplicationException.class,
+        () -> 
SegmentValidationUtils.validateUpsertSegmentPartitionMetadata(mockSegmentMetadata(null),
+            getOfflineUpsertTableConfig()));
+    assertEquals(exception.getResponse().getStatus(), 
Response.Status.BAD_REQUEST.getStatusCode());
+  }
+
+  @Test
+  public void 
testValidateUpsertSegmentPartitionMetadataRejectsMissingPartitionColumnMetadata()
 {
+    ControllerApplicationException exception = 
expectThrows(ControllerApplicationException.class,
+        () -> 
SegmentValidationUtils.validateUpsertSegmentPartitionMetadata(mockSegmentMetadataWithoutPartitionColumn(),
+            getOfflineUpsertTableConfig()));
+    assertEquals(exception.getResponse().getStatus(), 
Response.Status.BAD_REQUEST.getStatusCode());
+  }
+
+  @Test
+  public void testValidateUpsertSegmentPartitionMetadataSkipsNonUpsertTable() {
+    
SegmentValidationUtils.validateUpsertSegmentPartitionMetadata(mockSegmentMetadata(Set.of(1)),
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build());
+  }
+
+  @Test
+  public void 
testValidateUpsertSegmentPartitionMetadataSkipsTableWithoutPartitionColumn() {
+    
SegmentValidationUtils.validateUpsertSegmentPartitionMetadata(mockSegmentMetadata(Set.of(1)),
+        new TableConfigBuilder(TableType.OFFLINE)
+            .setTableName(RAW_TABLE_NAME)
+            .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL))
+            .build());
+  }
+
+  @Test
+  public void 
testValidateUpsertSegmentPartitionMetadataDoesNotValidateTablePartitionConfigCardinality()
 {
+    TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE)
+        .setTableName(RAW_TABLE_NAME)
+        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL))
+        .setReplicaGroupStrategyConfig(new 
ReplicaGroupStrategyConfig(PARTITION_COLUMN, 1))
+        .setSegmentPartitionConfig(new 
SegmentPartitionConfig(Map.of(PARTITION_COLUMN,
+            new ColumnPartitionConfig("Murmur", NUM_PARTITIONS), 
"otherPartitionColumn",
+            new ColumnPartitionConfig("Modulo", NUM_PARTITIONS))))
+        .build();
+
+    
SegmentValidationUtils.validateUpsertSegmentPartitionMetadata(mockSegmentMetadata(Set.of(1)),
 tableConfig);
+  }
+
+  private static TableConfig getOfflineUpsertTableConfig() {
+    return new TableConfigBuilder(TableType.OFFLINE)
+        .setTableName(RAW_TABLE_NAME)
+        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL))
+        .setReplicaGroupStrategyConfig(new 
ReplicaGroupStrategyConfig(PARTITION_COLUMN, 1))
+        .build();
+  }
+
+  private static TableConfig 
getOfflineUpsertTableConfigWithSegmentPartitionConfig() {
+    return new TableConfigBuilder(TableType.OFFLINE)
+        .setTableName(RAW_TABLE_NAME)
+        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL))
+        .setSegmentPartitionConfig(new 
SegmentPartitionConfig(Map.of(PARTITION_COLUMN,
+            new ColumnPartitionConfig("Murmur", NUM_PARTITIONS))))
+        .build();
+  }
+
+  private static SegmentMetadata mockSegmentMetadata(Set<Integer> partitions) {
+    ColumnMetadata columnMetadata = mockColumnMetadata(partitions);
+    SegmentMetadata segmentMetadata = mock(SegmentMetadata.class);
+    when(segmentMetadata.getName()).thenReturn(SEGMENT_NAME);
+    
when(segmentMetadata.getColumnMetadataFor(PARTITION_COLUMN)).thenReturn(columnMetadata);
+    return segmentMetadata;
+  }
+
+  private static SegmentMetadata mockSegmentMetadataWithoutPartitionColumn() {
+    SegmentMetadata segmentMetadata = mock(SegmentMetadata.class);
+    when(segmentMetadata.getName()).thenReturn(SEGMENT_NAME);
+    
when(segmentMetadata.getColumnMetadataFor(PARTITION_COLUMN)).thenReturn(null);
+    return segmentMetadata;
+  }
+
+  private static ColumnMetadata mockColumnMetadata(Set<Integer> partitions) {
+    ColumnMetadata columnMetadata = mock(ColumnMetadata.class);
+    when(columnMetadata.getPartitions()).thenReturn(partitions);
+    return columnMetadata;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to