This is an automated email from the ASF dual-hosted git repository.

mcvsubbu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new c92d677  Add a new server api for download of segments. (#5221)
c92d677 is described below

commit c92d6773c3c1539f9e9ec533dbe980fbaaeee6b4
Author: Ting Chen <[email protected]>
AuthorDate: Wed Apr 15 09:16:56 2020 -0700

    Add a new server api for download of segments. (#5221)
    
    * Add a new server api for download of segments.
    
    * Revise based on feedback.
    
    * Revise the getTableDir and delete tar.gz files on exit.
    
    * Store temp tar.gz file in the server's segment tar folder.
    
    * Further revision based on feedbacks.
    
    * Fix tests.
    
    * Minor typo.
---
 .../core/data/manager/BaseTableDataManager.java    |   5 +
 .../pinot/core/data/manager/TableDataManager.java  |   6 ++
 .../pinot/server/api/resources/TablesResource.java |  57 +++++++++++
 .../apache/pinot/server/api/BaseResourceTest.java  |  46 ++++++---
 .../pinot/server/api/TableSizeResourceTest.java    |  55 ++++++----
 .../pinot/server/api/TablesResourceTest.java       | 114 ++++++++++++++++++---
 6 files changed, 231 insertions(+), 52 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 6ea0b9a..ea7279c 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -211,4 +211,9 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
   public String getTableName() {
     return _tableNameWithType;
   }
+
+  @Override
+  public File getTableDataDir() {
+    return _indexDir;
+  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/TableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/TableDataManager.java
index 79a3445..dc9e193 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/TableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/TableDataManager.java
@@ -116,4 +116,10 @@ public interface TableDataManager {
    * Returns the table name managed by this instance.
    */
   String getTableName();
+
+  /**
+   * Returns the dir which contains the data segments.
+   * @return
+   */
+  File getTableDataDir();
 }
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
index 662d4e0..91a2008 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
@@ -23,25 +23,34 @@ import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiParam;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
+import java.io.File;
+import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import javax.inject.Inject;
 import javax.ws.rs.DefaultValue;
+import javax.ws.rs.Encoded;
 import javax.ws.rs.GET;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
 import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
+import javax.ws.rs.core.StreamingOutput;
+import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.restlet.resources.ResourceUtils;
 import org.apache.pinot.common.restlet.resources.TableSegments;
 import org.apache.pinot.common.restlet.resources.TablesList;
+import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.data.manager.SegmentDataManager;
 import org.apache.pinot.core.data.manager.TableDataManager;
@@ -55,6 +64,7 @@ import org.slf4j.LoggerFactory;
 @Path("/")
 public class TablesResource {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TablesResource.class);
+  private static final String PEER_SEGMENT_DOWNLOAD_DIR = 
"peerSegmentDownloadDir";
 
   @Inject
   ServerInstance serverInstance;
@@ -175,4 +185,51 @@ public class TablesResource {
       }
     }
   }
+
+  // TODO Add access control similar to 
PinotSegmentUploadDownloadRestletResource for segment download.
+  @GET
+  @Produces(MediaType.APPLICATION_OCTET_STREAM)
+  @Path("/segments/{tableNameWithType}/{segmentName}")
+  @ApiOperation(value = "Download an immutable segment", notes = "Download an 
immutable segment in zipped tar format.")
+  public Response downloadSegment(
+      @ApiParam(value = "Name of the table with type REALTIME OR OFFLINE", 
required = true, example = "myTable_OFFLINE") @PathParam("tableNameWithType") 
String tableNameWithType,
+      @ApiParam(value = "Name of the segment", required = true) 
@PathParam("segmentName") @Encoded String segmentName,
+      @Context HttpHeaders httpHeaders)
+      throws Exception {
+    LOGGER.info("Received a request to download segment {} for table {}", 
segmentName, tableNameWithType);
+    TableDataManager tableDataManager = 
checkGetTableDataManager(tableNameWithType);
+    SegmentDataManager segmentDataManager = 
tableDataManager.acquireSegment(segmentName);
+    if (segmentDataManager == null) {
+      throw new WebApplicationException(
+          String.format("Table %s segment %s does not exist", 
tableNameWithType, segmentName),
+          Response.Status.NOT_FOUND);
+    }
+    try {
+      // TODO Limit the number of concurrent downloads of segments because 
compression is an expensive operation.
+      // Store the tar.gz segment file in the server's segmentTarDir folder 
with a unique file name.
+      // Note that two clients asking the same segment file will result in the 
same tar.gz files being created twice.
+      // Will revisit for optimization if performance becomes an issue.
+      File tmpSegmentTarDir = new 
File(serverInstance.getInstanceDataManager().getSegmentFileDirectory(), 
PEER_SEGMENT_DOWNLOAD_DIR);
+      tmpSegmentTarDir.mkdir();
+
+      String tarFilePath = TarGzCompressionUtils
+          .createTarGzOfDirectory(new File(tableDataManager.getTableDataDir(), 
segmentName).getAbsolutePath(),
+              new File(tmpSegmentTarDir, tableNameWithType+ "_" + segmentName 
+ "_" + UUID.randomUUID()).getAbsolutePath());
+      File tarFile = new File(tarFilePath);
+      tarFile.deleteOnExit();
+      Response.ResponseBuilder builder = Response.ok();
+      builder.entity((StreamingOutput) output -> {
+        try {
+          Files.copy(tarFile.toPath(), output);
+        } finally {
+          FileUtils.deleteQuietly(tarFile);
+        }
+      });
+      builder.header(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=" 
+ tarFile.getName());
+      builder.header(HttpHeaders.CONTENT_LENGTH, tarFile.length());
+      return builder.build();
+    } finally {
+      tableDataManager.releaseSegment(segmentDataManager);
+    }
+  }
 }
diff --git 
a/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java 
b/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
index 08ab8c3..f4b5b1e 100644
--- 
a/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
+++ 
b/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 import javax.ws.rs.client.ClientBuilder;
 import javax.ws.rs.client.WebTarget;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.io.FileUtils;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.common.metrics.ServerMetrics;
@@ -43,6 +44,7 @@ import 
org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl
 import org.apache.pinot.segments.v1.creator.SegmentTestUtils;
 import org.apache.pinot.server.starter.ServerInstance;
 import org.apache.pinot.server.starter.helix.AdminApiApplication;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -58,8 +60,8 @@ public abstract class BaseResourceTest {
   protected static final String TABLE_NAME = "testTable";
 
   private final Map<String, TableDataManager> _tableDataManagerMap = new 
HashMap<>();
-  protected final List<ImmutableSegment> _indexSegments = new ArrayList<>();
-
+  protected final List<ImmutableSegment> _realtimeIndexSegments = new 
ArrayList<>();
+  protected final List<ImmutableSegment> _offlineIndexSegments = new 
ArrayList<>();
   private File _avroFile;
   private AdminApiApplication _adminApiApplication;
   protected WebTarget _webTarget;
@@ -83,10 +85,16 @@ public abstract class BaseResourceTest {
     // Mock the server instance
     ServerInstance serverInstance = mock(ServerInstance.class);
     
when(serverInstance.getInstanceDataManager()).thenReturn(instanceDataManager);
+    
when(serverInstance.getInstanceDataManager().getSegmentFileDirectory()).thenReturn(FileUtils.getTempDirectoryPath());
+
+    // Add the default tables and segments.
+    String realtimeTableName = 
TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME);
+    String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME);
 
-    // Add the default table and segment
-    addTable(TABLE_NAME);
-    setUpSegment("default");
+    addTable(realtimeTableName);
+    addTable(offlineTableName);
+    setUpSegment(realtimeTableName, "default", _realtimeIndexSegments);
+    setUpSegment(offlineTableName, "default", _offlineIndexSegments);
 
     _adminApiApplication = new AdminApiApplication(serverInstance);
     _adminApiApplication.start(CommonConstants.Server.DEFAULT_ADMIN_API_PORT);
@@ -96,46 +104,50 @@ public abstract class BaseResourceTest {
   @AfterClass
   public void tearDown() {
     _adminApiApplication.stop();
-    for (ImmutableSegment immutableSegment : _indexSegments) {
+    for (ImmutableSegment immutableSegment : _realtimeIndexSegments) {
+      immutableSegment.destroy();
+    }
+    for (ImmutableSegment immutableSegment : _offlineIndexSegments) {
       immutableSegment.destroy();
     }
     FileUtils.deleteQuietly(INDEX_DIR);
   }
 
-  protected List<ImmutableSegment> setUpSegments(int numSegments)
+  protected List<ImmutableSegment> setUpSegments(String tableNameWithType, int 
numSegments, List<ImmutableSegment> segments)
       throws Exception {
     List<ImmutableSegment> immutableSegments = new ArrayList<>();
     for (int i = 0; i < numSegments; i++) {
-      
immutableSegments.add(setUpSegment(Integer.toString(_indexSegments.size())));
+      immutableSegments.add(setUpSegment(tableNameWithType, 
Integer.toString(_realtimeIndexSegments.size()), segments));
     }
     return immutableSegments;
   }
 
-  protected ImmutableSegment setUpSegment(String segmentNamePostfix)
+  protected ImmutableSegment setUpSegment(String tableNameWithType, String 
segmentNamePostfix, List<ImmutableSegment> segments)
       throws Exception {
     SegmentGeneratorConfig config =
-        SegmentTestUtils.getSegmentGeneratorConfigWithoutTimeColumn(_avroFile, 
INDEX_DIR, TABLE_NAME);
+        SegmentTestUtils.getSegmentGeneratorConfigWithoutTimeColumn(_avroFile, 
INDEX_DIR, tableNameWithType);
     config.setSegmentNamePostfix(segmentNamePostfix);
     SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
     driver.init(config);
     driver.build();
     ImmutableSegment immutableSegment =
         ImmutableSegmentLoader.load(new File(INDEX_DIR, 
driver.getSegmentName()), ReadMode.mmap);
-    _indexSegments.add(immutableSegment);
-    _tableDataManagerMap.get(TABLE_NAME).addSegment(immutableSegment);
+    segments.add(immutableSegment);
+    _tableDataManagerMap.get(tableNameWithType).addSegment(immutableSegment);
     return immutableSegment;
   }
 
   @SuppressWarnings("unchecked")
-  protected void addTable(String tableName) {
+  protected void addTable(String tableNameWithType) {
     TableDataManagerConfig tableDataManagerConfig = 
mock(TableDataManagerConfig.class);
-    
when(tableDataManagerConfig.getTableDataManagerType()).thenReturn("OFFLINE");
-    when(tableDataManagerConfig.getTableName()).thenReturn(tableName);
-    
when(tableDataManagerConfig.getDataDir()).thenReturn(FileUtils.getTempDirectoryPath());
+    when(tableDataManagerConfig.getTableDataManagerType())
+        
.thenReturn(TableNameBuilder.getTableTypeFromTableName(tableNameWithType).name());
+    when(tableDataManagerConfig.getTableName()).thenReturn(tableNameWithType);
+    
when(tableDataManagerConfig.getDataDir()).thenReturn(INDEX_DIR.getAbsolutePath());
     TableDataManager tableDataManager = TableDataManagerProvider
         .getTableDataManager(tableDataManagerConfig, "testInstance", 
mock(ZkHelixPropertyStore.class),
             mock(ServerMetrics.class));
     tableDataManager.start();
-    _tableDataManagerMap.put(tableName, tableDataManager);
+    _tableDataManagerMap.put(tableNameWithType, tableDataManager);
   }
 }
diff --git 
a/pinot-server/src/test/java/org/apache/pinot/server/api/TableSizeResourceTest.java
 
b/pinot-server/src/test/java/org/apache/pinot/server/api/TableSizeResourceTest.java
index f30b291..790d120 100644
--- 
a/pinot-server/src/test/java/org/apache/pinot/server/api/TableSizeResourceTest.java
+++ 
b/pinot-server/src/test/java/org/apache/pinot/server/api/TableSizeResourceTest.java
@@ -21,13 +21,12 @@ package org.apache.pinot.server.api;
 import javax.ws.rs.core.Response;
 import org.apache.pinot.common.restlet.resources.TableSizeInfo;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
 
 public class TableSizeResourceTest extends BaseResourceTest {
-  private static final String TABLE_SIZE_PATH = "/tables/" + TABLE_NAME + 
"/size";
-
   @Test
   public void testTableSizeNotFound() {
     Response response = 
_webTarget.path("table/unknownTable/size").request().get(Response.class);
@@ -36,38 +35,54 @@ public class TableSizeResourceTest extends BaseResourceTest 
{
 
   @Test
   public void testTableSizeDetailed() {
-    TableSizeInfo tableSizeInfo = 
_webTarget.path(TABLE_SIZE_PATH).request().get(TableSizeInfo.class);
-    ImmutableSegment defaultSegment = _indexSegments.get(0);
+    
verifyTableSizeDetailedImpl(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME),
 _realtimeIndexSegments.get(0));
+    
verifyTableSizeDetailedImpl(TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME),
 _offlineIndexSegments.get(0));
+  }
 
-    Assert.assertEquals(tableSizeInfo.tableName, TABLE_NAME);
-    Assert.assertEquals(tableSizeInfo.diskSizeInBytes, 
defaultSegment.getSegmentSizeBytes());
+  private void verifyTableSizeDetailedImpl(String expectedTableName, 
ImmutableSegment segment) {
+    String path = "/tables/" + expectedTableName + "/size";
+    TableSizeInfo tableSizeInfo = 
_webTarget.path(path).request().get(TableSizeInfo.class);
+
+    Assert.assertEquals(tableSizeInfo.tableName, expectedTableName);
+    Assert.assertEquals(tableSizeInfo.diskSizeInBytes, 
segment.getSegmentSizeBytes());
     Assert.assertEquals(tableSizeInfo.segments.size(), 1);
-    Assert.assertEquals(tableSizeInfo.segments.get(0).segmentName, 
defaultSegment.getSegmentName());
-    Assert.assertEquals(tableSizeInfo.segments.get(0).diskSizeInBytes, 
defaultSegment.getSegmentSizeBytes());
-    Assert.assertEquals(tableSizeInfo.diskSizeInBytes, 
defaultSegment.getSegmentSizeBytes());
+    Assert.assertEquals(tableSizeInfo.segments.get(0).segmentName, 
segment.getSegmentName());
+    Assert.assertEquals(tableSizeInfo.segments.get(0).diskSizeInBytes, 
segment.getSegmentSizeBytes());
+    Assert.assertEquals(tableSizeInfo.diskSizeInBytes, 
segment.getSegmentSizeBytes());
   }
 
   @Test
   public void testTableSizeNoDetails() {
+    
verifyTableSizeNoDetailsImpl(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME),
+        _realtimeIndexSegments.get(0));
+    
verifyTableSizeNoDetailsImpl(TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME),
 _offlineIndexSegments.get(0));
+  }
+
+  private void verifyTableSizeNoDetailsImpl(String expectedTableName, 
ImmutableSegment segment) {
+    String path = "/tables/" + expectedTableName + "/size";
     TableSizeInfo tableSizeInfo =
-        _webTarget.path(TABLE_SIZE_PATH).queryParam("detailed", 
"false").request().get(TableSizeInfo.class);
-    ImmutableSegment defaultSegment = _indexSegments.get(0);
+        _webTarget.path(path).queryParam("detailed", 
"false").request().get(TableSizeInfo.class);
 
-    Assert.assertEquals(tableSizeInfo.tableName, TABLE_NAME);
-    Assert.assertEquals(tableSizeInfo.diskSizeInBytes, 
defaultSegment.getSegmentSizeBytes());
+    Assert.assertEquals(tableSizeInfo.tableName, expectedTableName);
+    Assert.assertEquals(tableSizeInfo.diskSizeInBytes, 
segment.getSegmentSizeBytes());
     Assert.assertEquals(tableSizeInfo.segments.size(), 0);
   }
 
   @Test
   public void testTableSizeOld() {
-    TableSizeInfo tableSizeInfo = _webTarget.path("/table/" + TABLE_NAME + 
"/size").request().get(TableSizeInfo.class);
-    ImmutableSegment defaultSegment = _indexSegments.get(0);
+    
verifyTableSizeOldImpl(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME), 
_realtimeIndexSegments.get(0));
+    
verifyTableSizeOldImpl(TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME), 
_offlineIndexSegments.get(0));
+  }
+
+  private void verifyTableSizeOldImpl(String expectedTableName, 
ImmutableSegment segment) {
+    String path = "/table/" + expectedTableName + "/size";
+    TableSizeInfo tableSizeInfo = 
_webTarget.path(path).request().get(TableSizeInfo.class);
 
-    Assert.assertEquals(tableSizeInfo.tableName, TABLE_NAME);
-    Assert.assertEquals(tableSizeInfo.diskSizeInBytes, 
defaultSegment.getSegmentSizeBytes());
+    Assert.assertEquals(tableSizeInfo.tableName, expectedTableName);
+    Assert.assertEquals(tableSizeInfo.diskSizeInBytes, 
segment.getSegmentSizeBytes());
     Assert.assertEquals(tableSizeInfo.segments.size(), 1);
-    Assert.assertEquals(tableSizeInfo.segments.get(0).segmentName, 
defaultSegment.getSegmentName());
-    Assert.assertEquals(tableSizeInfo.segments.get(0).diskSizeInBytes, 
defaultSegment.getSegmentSizeBytes());
-    Assert.assertEquals(tableSizeInfo.diskSizeInBytes, 
defaultSegment.getSegmentSizeBytes());
+    Assert.assertEquals(tableSizeInfo.segments.get(0).segmentName, 
segment.getSegmentName());
+    Assert.assertEquals(tableSizeInfo.segments.get(0).diskSizeInBytes, 
segment.getSegmentSizeBytes());
+    Assert.assertEquals(tableSizeInfo.diskSizeInBytes, 
segment.getSegmentSizeBytes());
   }
 }
diff --git 
a/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
 
b/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
index 24d36eb..9363b5a 100644
--- 
a/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
+++ 
b/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
@@ -19,19 +19,33 @@
 package org.apache.pinot.server.api;
 
 import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.List;
 import javax.ws.rs.core.Response;
+import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.restlet.resources.TableSegments;
 import org.apache.pinot.common.restlet.resources.TablesList;
+import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.core.indexsegment.IndexSegment;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
+import org.apache.pinot.core.segment.creator.impl.V1Constants;
 import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
-import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
 
 public class TablesResourceTest extends BaseResourceTest {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TablesResourceTest.class);
 
   @Test
   public void getTables()
@@ -45,10 +59,11 @@ public class TablesResourceTest extends BaseResourceTest {
     Assert.assertNotNull(tablesList);
     List<String> tables = tablesList.getTables();
     Assert.assertNotNull(tables);
-    Assert.assertEquals(tables.size(), 1);
-    Assert.assertEquals(tables.get(0), TABLE_NAME);
+    Assert.assertEquals(tables.size(), 2);
+    Assert.assertEquals(tables.get(0), 
TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME));
+    Assert.assertEquals(tables.get(1), 
TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME));
 
-    String secondTable = "secondTable";
+    String secondTable = "secondTable_REALTIME";
     addTable(secondTable);
     response = _webTarget.path(tablesPath).request().get(Response.class);
     responseBody = response.readEntity(String.class);
@@ -57,25 +72,27 @@ public class TablesResourceTest extends BaseResourceTest {
     Assert.assertNotNull(tablesList);
     tables = tablesList.getTables();
     Assert.assertNotNull(tables);
-    Assert.assertEquals(tables.size(), 2);
-    Assert.assertTrue(tables.contains(TABLE_NAME));
+    Assert.assertEquals(tables.size(), 3);
+    
Assert.assertTrue(tables.contains(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME)));
     Assert.assertTrue(tables.contains(secondTable));
+    
Assert.assertTrue(tables.contains(TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME)));
   }
 
   @Test
   public void getSegments()
       throws Exception {
-    String segmentsPath = "/tables/" + TABLE_NAME + "/segments";
-    IndexSegment defaultSegment = _indexSegments.get(0);
+    String segmentsPath = "/tables/" + 
TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME) + "/segments";
+    IndexSegment defaultSegment = _realtimeIndexSegments.get(0);
 
     TableSegments tableSegments = 
_webTarget.path(segmentsPath).request().get(TableSegments.class);
     Assert.assertNotNull(tableSegments);
     List<String> segmentNames = tableSegments.getSegments();
     Assert.assertNotNull(segmentNames);
     Assert.assertEquals(segmentNames.size(), 1);
-    Assert.assertEquals(segmentNames.get(0), 
_indexSegments.get(0).getSegmentName());
+    Assert.assertEquals(segmentNames.get(0), 
_realtimeIndexSegments.get(0).getSegmentName());
 
-    IndexSegment secondSegment = setUpSegment("0");
+    IndexSegment secondSegment =
+        setUpSegment(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME), 
"0", _realtimeIndexSegments);
     tableSegments = 
_webTarget.path(segmentsPath).request().get(TableSegments.class);
     Assert.assertNotNull(tableSegments);
     segmentNames = tableSegments.getSegments();
@@ -93,8 +110,10 @@ public class TablesResourceTest extends BaseResourceTest {
   @Test
   public void testSegmentMetadata()
       throws Exception {
-    IndexSegment defaultSegment = _indexSegments.get(0);
-    String segmentMetadataPath = "/tables/" + TABLE_NAME + "/segments/" + 
defaultSegment.getSegmentName() + "/metadata";
+    IndexSegment defaultSegment = _realtimeIndexSegments.get(0);
+    String segmentMetadataPath =
+        "/tables/" + TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME) + 
"/segments/" + defaultSegment
+            .getSegmentName() + "/metadata";
 
     JsonNode jsonResponse =
         
JsonUtils.stringToJsonNode(_webTarget.path(segmentMetadataPath).request().get(String.class));
@@ -126,17 +145,20 @@ public class TablesResourceTest extends BaseResourceTest {
         .get(Response.class);
     Assert.assertEquals(response.getStatus(), 
Response.Status.NOT_FOUND.getStatusCode());
 
-    response = _webTarget.path("/tables/" + TABLE_NAME + 
"/segments/UNKNOWN_SEGMENT").request().get(Response.class);
+    response = _webTarget
+        .path("/tables/" + 
TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME) + 
"/segments/UNKNOWN_SEGMENT")
+        .request().get(Response.class);
     Assert.assertEquals(response.getStatus(), 
Response.Status.NOT_FOUND.getStatusCode());
   }
 
   @Test
   public void testSegmentCrcMetadata()
       throws Exception {
-    String segmentsCrcPath = "/tables/" + TABLE_NAME + "/segments/crc";
+    String segmentsCrcPath = "/tables/" + 
TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME) + "/segments/crc";
 
     // Upload segments
-    List<ImmutableSegment> immutableSegments = setUpSegments(2);
+    List<ImmutableSegment> immutableSegments =
+        setUpSegments(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME), 
2, _realtimeIndexSegments);
 
     // Trigger crc api to fetch crc information
     String response = 
_webTarget.path(segmentsCrcPath).request().get(String.class);
@@ -149,4 +171,66 @@ public class TablesResourceTest extends BaseResourceTest {
       Assert.assertEquals(segmentsCrc.get(segmentName).asText(), crc);
     }
   }
+
+  @Test
+  public void testDownloadSegments()
+      throws Exception {
+    // Verify the content of the downloaded segment from a realtime table.
+    
Assert.assertTrue(downLoadAndVerifySegmentContent(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME),
+        _realtimeIndexSegments.get(0)));
+    // Verify the content of the downloaded segment from an offline table.
+    
Assert.assertTrue(downLoadAndVerifySegmentContent(TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME),
+        _offlineIndexSegments.get(0)));
+
+    // Verify non-existent table and segment download return NOT_FOUND status.
+    Response response = 
_webTarget.path("/tables/UNKNOWN_REALTIME/segments/segmentname").request().get(Response.class);
+    Assert.assertEquals(response.getStatus(), 
Response.Status.NOT_FOUND.getStatusCode());
+
+    response = _webTarget
+        .path("/tables/" + 
TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME) + 
"/segments/UNKNOWN_SEGMENT")
+        .request().get(Response.class);
+    Assert.assertEquals(response.getStatus(), 
Response.Status.NOT_FOUND.getStatusCode());
+  }
+
+  // Verify metadata file from segments.
+  private boolean downLoadAndVerifySegmentContent(String tableNameWithType, 
IndexSegment segment) {
+    String segmentPath = "/segments/" + tableNameWithType + "/" + 
segment.getSegmentName();
+
+    // Download the segment and save to a temp local file.
+    Response response = 
_webTarget.path(segmentPath).request().get(Response.class);
+    Assert.assertEquals(response.getStatus(), 
Response.Status.OK.getStatusCode());
+    File segmentFile = response.readEntity(File.class);
+
+    File tempMetadataDir = new File(FileUtils.getTempDirectory(), 
"segment_metadata");
+    try (// Extract metadata.properties
+        InputStream metadataPropertiesInputStream = TarGzCompressionUtils
+            .unTarOneFile(new FileInputStream(segmentFile), 
V1Constants.MetadataKeys.METADATA_FILE_NAME);
+        // Extract creation.meta
+        InputStream creationMetaInputStream = TarGzCompressionUtils
+            .unTarOneFile(new FileInputStream(segmentFile), 
V1Constants.SEGMENT_CREATION_META)) {
+      Preconditions
+          .checkState(tempMetadataDir.mkdirs(), "Failed to create directory: 
%s", tempMetadataDir.getAbsolutePath());
+
+      Preconditions.checkNotNull(metadataPropertiesInputStream, "%s does not 
exist",
+          V1Constants.MetadataKeys.METADATA_FILE_NAME);
+      Path metadataPropertiesPath = FileSystems.getDefault()
+          .getPath(tempMetadataDir.getAbsolutePath(), 
V1Constants.MetadataKeys.METADATA_FILE_NAME);
+      Files.copy(metadataPropertiesInputStream, metadataPropertiesPath);
+
+      Preconditions.checkNotNull(creationMetaInputStream, "%s does not exist", 
V1Constants.SEGMENT_CREATION_META);
+      Path creationMetaPath =
+          FileSystems.getDefault().getPath(tempMetadataDir.getAbsolutePath(), 
V1Constants.SEGMENT_CREATION_META);
+      Files.copy(creationMetaInputStream, creationMetaPath);
+      // Load segment metadata
+      SegmentMetadataImpl metadata = new SegmentMetadataImpl(tempMetadataDir);
+
+      Assert.assertEquals(tableNameWithType, metadata.getTableName());
+      return true;
+    } catch (Exception e) {
+      LOGGER.error("Failure in segment extraction and verification:", e);
+      return false;
+    } finally {
+      FileUtils.deleteQuietly(tempMetadataDir);
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to