This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e77080007 Add an API to download validDocIdsSnapshots from peer 
servers (#10052)
8e77080007 is described below

commit 8e7708000785a6a20b54ac0efa3f6c2ebaae5195
Author: deemoliu <[email protected]>
AuthorDate: Fri Jan 13 12:35:02 2023 -0800

    Add an API to download validDocIdsSnapshots from peer servers (#10052)
---
 .../pinot/common/utils/RoaringBitmapUtils.java     |  3 +-
 .../server/api/resources/ServerResourceUtils.java  | 21 +++++++
 .../pinot/server/api/resources/TablesResource.java | 71 +++++++++++++++++-----
 .../pinot/server/api/TablesResourceTest.java       | 50 +++++++++++++++
 4 files changed, 129 insertions(+), 16 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/RoaringBitmapUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/RoaringBitmapUtils.java
index 861ac5c114..520ff8f233 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/RoaringBitmapUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/RoaringBitmapUtils.java
@@ -20,6 +20,7 @@ package org.apache.pinot.common.utils;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import org.roaringbitmap.ImmutableBitmapDataProvider;
 import org.roaringbitmap.RoaringBitmap;
 
 
@@ -27,7 +28,7 @@ public class RoaringBitmapUtils {
   private RoaringBitmapUtils() {
   }
 
-  public static byte[] serialize(RoaringBitmap bitmap) {
+  public static byte[] serialize(ImmutableBitmapDataProvider bitmap) {
     byte[] bytes = new byte[bitmap.serializedSizeInBytes()];
     ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
     bitmap.serialize(byteBuffer);
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/ServerResourceUtils.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/ServerResourceUtils.java
index 986d67d2d5..0159d8d542 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/ServerResourceUtils.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/ServerResourceUtils.java
@@ -19,9 +19,14 @@
 package org.apache.pinot.server.api.resources;
 
 import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.Response;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.server.access.AccessControl;
+import org.apache.pinot.server.access.AccessControlFactory;
+import org.apache.pinot.server.access.HttpRequesterIdentity;
+import org.apache.pinot.server.access.RequesterIdentity;
 import org.apache.pinot.server.starter.ServerInstance;
 
 
@@ -54,4 +59,20 @@ public class ServerResourceUtils {
     }
     return instanceDataManager;
   }
+
+  public static void validateDataAccess(AccessControlFactory 
accessControlFactory, String tableNameWithType,
+      HttpHeaders httpHeaders) {
+    boolean hasDataAccess;
+    try {
+      AccessControl accessControl = accessControlFactory.create();
+      RequesterIdentity httpRequestIdentity = new 
HttpRequesterIdentity(httpHeaders);
+      hasDataAccess = accessControl.hasDataAccess(httpRequestIdentity, 
tableNameWithType);
+    } catch (Exception e) {
+      throw new WebApplicationException("Caught exception while validating 
access to table: " + tableNameWithType,
+          Response.Status.INTERNAL_SERVER_ERROR);
+    }
+    if (!hasDataAccess) {
+      throw new WebApplicationException("No data access to table: " + 
tableNameWithType, Response.Status.FORBIDDEN);
+    }
+  }
 }
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
index 27df969782..4495d19ebb 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
@@ -69,6 +69,7 @@ import 
org.apache.pinot.common.restlet.resources.TableSegmentValidationInfo;
 import org.apache.pinot.common.restlet.resources.TableSegments;
 import org.apache.pinot.common.restlet.resources.TablesList;
 import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.RoaringBitmapUtils;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.common.utils.helix.HelixHelper;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
@@ -77,14 +78,13 @@ import 
org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
 import org.apache.pinot.core.data.manager.realtime.SegmentUploader;
 import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
 import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.store.ColumnIndexType;
-import org.apache.pinot.server.access.AccessControl;
 import org.apache.pinot.server.access.AccessControlFactory;
-import org.apache.pinot.server.access.HttpRequesterIdentity;
-import org.apache.pinot.server.access.RequesterIdentity;
 import org.apache.pinot.server.api.AdminApiApplication;
 import org.apache.pinot.server.starter.ServerInstance;
 import org.apache.pinot.spi.config.table.TableType;
@@ -93,6 +93,7 @@ import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.stream.ConsumerPartitionState;
 import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -374,18 +375,7 @@ public class TablesResource {
       throws Exception {
     LOGGER.info("Received a request to download segment {} for table {}", 
segmentName, tableNameWithType);
     // Validate data access
-    boolean hasDataAccess;
-    try {
-      AccessControl accessControl = _accessControlFactory.create();
-      RequesterIdentity httpRequestIdentity = new 
HttpRequesterIdentity(httpHeaders);
-      hasDataAccess = accessControl.hasDataAccess(httpRequestIdentity, 
tableNameWithType);
-    } catch (Exception e) {
-      throw new WebApplicationException("Caught exception while validating 
access to table: " + tableNameWithType,
-          Response.Status.INTERNAL_SERVER_ERROR);
-    }
-    if (!hasDataAccess) {
-      throw new WebApplicationException("No data access to table: " + 
tableNameWithType, Response.Status.FORBIDDEN);
-    }
+    ServerResourceUtils.validateDataAccess(_accessControlFactory, 
tableNameWithType, httpHeaders);
 
     TableDataManager tableDataManager =
         ServerResourceUtils.checkGetTableDataManager(_serverInstance, 
tableNameWithType);
@@ -423,6 +413,57 @@ public class TablesResource {
     }
   }
 
+  /**
+   * Download snapshot for the given immutable segment for upsert table. This 
endpoint is used when get snapshot from
+   * peer to avoid recompute when reload segments.
+   */
+  @GET
+  @Produces(MediaType.APPLICATION_OCTET_STREAM)
+  @Path("/segments/{tableNameWithType}/{segmentName}/validDocIds")
+  @ApiOperation(value = "Download validDocIds for an REALTIME immutable 
segment", notes = "Download validDocIds for "
+      + "an immutable segment in bitmap format.")
+  public Response downloadValidDocIds(
+      @ApiParam(value = "Name of the table with type REALTIME", required = 
true, example = "myTable_REALTIME")
+      @PathParam("tableNameWithType") String tableNameWithType,
+      @ApiParam(value = "Name of the segment", required = true) 
@PathParam("segmentName") @Encoded String segmentName,
+      @Context HttpHeaders httpHeaders) {
+    LOGGER.info("Received a request to download validDocIds for segment {} 
table {}", segmentName, tableNameWithType);
+    // Validate data access
+    ServerResourceUtils.validateDataAccess(_accessControlFactory, 
tableNameWithType, httpHeaders);
+
+    TableDataManager tableDataManager =
+        ServerResourceUtils.checkGetTableDataManager(_serverInstance, 
tableNameWithType);
+    SegmentDataManager segmentDataManager = 
tableDataManager.acquireSegment(segmentName);
+    if (segmentDataManager == null) {
+      throw new WebApplicationException(
+          String.format("Table %s segment %s does not exist", 
tableNameWithType, segmentName),
+          Response.Status.NOT_FOUND);
+    }
+
+    try {
+      IndexSegment indexSegment = segmentDataManager.getSegment();
+      if (!(indexSegment instanceof ImmutableSegmentImpl)) {
+        throw new WebApplicationException(
+            String.format("Table %s segment %s is not a immutable segment", 
tableNameWithType, segmentName),
+            Response.Status.BAD_REQUEST);
+      }
+      MutableRoaringBitmap validDocIds =
+          indexSegment.getValidDocIds() != null ? 
indexSegment.getValidDocIds().getMutableRoaringBitmap() : null;
+      if (validDocIds == null) {
+        throw new WebApplicationException(
+            String.format("Missing validDocIds for table %s segment %s does 
not exist", tableNameWithType, segmentName),
+            Response.Status.NOT_FOUND);
+      }
+
+      byte[] validDocIdsBytes = RoaringBitmapUtils.serialize(validDocIds);
+      Response.ResponseBuilder builder = Response.ok(validDocIdsBytes);
+      builder.header(HttpHeaders.CONTENT_LENGTH, validDocIdsBytes.length);
+      return builder.build();
+    } finally {
+      tableDataManager.releaseSegment(segmentDataManager);
+    }
+  }
+
   /**
    * Upload a low level consumer segment to segment store and return the 
segment download url. This endpoint is used
    * when segment store copy is unavailable for committed low level consumer 
segments.
diff --git 
a/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
 
b/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
index c60ffd1b84..ce7ed438cc 100644
--- 
a/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
+++ 
b/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
@@ -21,6 +21,7 @@ package org.apache.pinot.server.api;
 import com.fasterxml.jackson.databind.JsonNode;
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.List;
 import javax.ws.rs.core.Response;
 import org.apache.commons.io.FileUtils;
@@ -28,18 +29,24 @@ import 
org.apache.pinot.common.restlet.resources.TableMetadataInfo;
 import org.apache.pinot.common.restlet.resources.TableSegments;
 import org.apache.pinot.common.restlet.resources.TablesList;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
+import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
 import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import 
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
 import org.apache.pinot.segment.spi.store.ColumnIndexType;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import static org.mockito.Mockito.mock;
+
 
 public class TablesResourceTest extends BaseResourceTest {
   @Test
@@ -222,6 +229,24 @@ public class TablesResourceTest extends BaseResourceTest {
     Assert.assertEquals(response.getStatus(), 
Response.Status.NOT_FOUND.getStatusCode());
   }
 
+  @Test
+  public void testDownloadValidDocIdsSnapshot()
+      throws Exception {
+    // Verify the content of the downloaded snapshot from a realtime table.
+    
downLoadAndVerifyValidDocIdsSnapshot(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME),
+        (ImmutableSegmentImpl) _realtimeIndexSegments.get(0));
+
+    // Verify non-existent table and segment download return NOT_FOUND status.
+    Response response = 
_webTarget.path("/tables/UNKNOWN_REALTIME/segments/segmentname/validDocIds").request()
+        .get(Response.class);
+    Assert.assertEquals(response.getStatus(), 
Response.Status.NOT_FOUND.getStatusCode());
+
+    response = _webTarget.path(
+        String.format("/tables/%s/segments/%s/validDocIds", 
TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME),
+            "UNKNOWN_SEGMENT")).request().get(Response.class);
+    Assert.assertEquals(response.getStatus(), 
Response.Status.NOT_FOUND.getStatusCode());
+  }
+
   // Verify metadata file from segments.
   private void downLoadAndVerifySegmentContent(String tableNameWithType, 
IndexSegment segment)
       throws IOException {
@@ -250,6 +275,31 @@ public class TablesResourceTest extends BaseResourceTest {
     FileUtils.forceDelete(tempMetadataDir);
   }
 
+  // Verify metadata file from segments.
+  private void downLoadAndVerifyValidDocIdsSnapshot(String tableNameWithType, 
ImmutableSegmentImpl segment)
+      throws IOException {
+
+    String snapshotPath = "/segments/" + tableNameWithType + "/" + 
segment.getSegmentName() + "/validDocIds";
+
+    PartitionUpsertMetadataManager upsertMetadataManager = 
mock(PartitionUpsertMetadataManager.class);
+    ThreadSafeMutableRoaringBitmap validDocIds = new 
ThreadSafeMutableRoaringBitmap();
+    int[] docIds = new int[]{1, 4, 6, 10, 15, 17, 18, 20};
+    for (int docId: docIds) {
+      validDocIds.add(docId);
+    }
+    segment.enableUpsert(upsertMetadataManager, validDocIds);
+
+    // Download the snapshot in byte[] format.
+    Response response = 
_webTarget.path(snapshotPath).request().get(Response.class);
+    Assert.assertEquals(response.getStatus(), 
Response.Status.OK.getStatusCode());
+    byte[] snapshot = response.readEntity(byte[].class);
+
+    // Load the snapshot file.
+    Assert.assertNotNull(snapshot);
+    Assert.assertEquals(new 
ImmutableRoaringBitmap(ByteBuffer.wrap(snapshot)).toMutableRoaringBitmap(),
+        validDocIds.getMutableRoaringBitmap());
+  }
+
   @Test
   public void testUploadSegments()
       throws Exception {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to