This is an automated email from the ASF dual-hosted git repository.
mcvsubbu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new c92d677 Add a new server api for download of segments. (#5221)
c92d677 is described below
commit c92d6773c3c1539f9e9ec533dbe980fbaaeee6b4
Author: Ting Chen <[email protected]>
AuthorDate: Wed Apr 15 09:16:56 2020 -0700
Add a new server api for download of segments. (#5221)
* Add a new server api for download of segments.
* Revise based on feedback.
* Revise the getTableDir and delete tar.gz files on exit.
* Store temp tar.gz file in the server's segment tar folder.
* Further revision based on feedbacks.
* Fix tests.
* Minor typo.
---
.../core/data/manager/BaseTableDataManager.java | 5 +
.../pinot/core/data/manager/TableDataManager.java | 6 ++
.../pinot/server/api/resources/TablesResource.java | 57 +++++++++++
.../apache/pinot/server/api/BaseResourceTest.java | 46 ++++++---
.../pinot/server/api/TableSizeResourceTest.java | 55 ++++++----
.../pinot/server/api/TablesResourceTest.java | 114 ++++++++++++++++++---
6 files changed, 231 insertions(+), 52 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 6ea0b9a..ea7279c 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -211,4 +211,9 @@ public abstract class BaseTableDataManager implements
TableDataManager {
public String getTableName() {
return _tableNameWithType;
}
+
+ @Override
+ public File getTableDataDir() {
+ return _indexDir;
+ }
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/TableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/TableDataManager.java
index 79a3445..dc9e193 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/TableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/TableDataManager.java
@@ -116,4 +116,10 @@ public interface TableDataManager {
* Returns the table name managed by this instance.
*/
String getTableName();
+
+ /**
+ * Returns the dir which contains the data segments.
+ * @return
+ */
+ File getTableDataDir();
}
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
index 662d4e0..91a2008 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
@@ -23,25 +23,34 @@ import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
+import java.io.File;
+import java.nio.file.Files;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.UUID;
import javax.inject.Inject;
import javax.ws.rs.DefaultValue;
+import javax.ws.rs.Encoded;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
+import javax.ws.rs.core.StreamingOutput;
+import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.restlet.resources.ResourceUtils;
import org.apache.pinot.common.restlet.resources.TableSegments;
import org.apache.pinot.common.restlet.resources.TablesList;
+import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.SegmentDataManager;
import org.apache.pinot.core.data.manager.TableDataManager;
@@ -55,6 +64,7 @@ import org.slf4j.LoggerFactory;
@Path("/")
public class TablesResource {
private static final Logger LOGGER =
LoggerFactory.getLogger(TablesResource.class);
+ private static final String PEER_SEGMENT_DOWNLOAD_DIR =
"peerSegmentDownloadDir";
@Inject
ServerInstance serverInstance;
@@ -175,4 +185,51 @@ public class TablesResource {
}
}
}
+
+ // TODO Add access control similar to
PinotSegmentUploadDownloadRestletResource for segment download.
+ @GET
+ @Produces(MediaType.APPLICATION_OCTET_STREAM)
+ @Path("/segments/{tableNameWithType}/{segmentName}")
+ @ApiOperation(value = "Download an immutable segment", notes = "Download an
immutable segment in zipped tar format.")
+ public Response downloadSegment(
+ @ApiParam(value = "Name of the table with type REALTIME OR OFFLINE",
required = true, example = "myTable_OFFLINE") @PathParam("tableNameWithType")
String tableNameWithType,
+ @ApiParam(value = "Name of the segment", required = true)
@PathParam("segmentName") @Encoded String segmentName,
+ @Context HttpHeaders httpHeaders)
+ throws Exception {
+ LOGGER.info("Received a request to download segment {} for table {}",
segmentName, tableNameWithType);
+ TableDataManager tableDataManager =
checkGetTableDataManager(tableNameWithType);
+ SegmentDataManager segmentDataManager =
tableDataManager.acquireSegment(segmentName);
+ if (segmentDataManager == null) {
+ throw new WebApplicationException(
+ String.format("Table %s segment %s does not exist",
tableNameWithType, segmentName),
+ Response.Status.NOT_FOUND);
+ }
+ try {
+ // TODO Limit the number of concurrent downloads of segments because
compression is an expensive operation.
+ // Store the tar.gz segment file in the server's segmentTarDir folder
with a unique file name.
+ // Note that two clients asking the same segment file will result in the
same tar.gz files being created twice.
+ // Will revisit for optimization if performance becomes an issue.
+ File tmpSegmentTarDir = new
File(serverInstance.getInstanceDataManager().getSegmentFileDirectory(),
PEER_SEGMENT_DOWNLOAD_DIR);
+ tmpSegmentTarDir.mkdir();
+
+ String tarFilePath = TarGzCompressionUtils
+ .createTarGzOfDirectory(new File(tableDataManager.getTableDataDir(),
segmentName).getAbsolutePath(),
+ new File(tmpSegmentTarDir, tableNameWithType+ "_" + segmentName
+ "_" + UUID.randomUUID()).getAbsolutePath());
+ File tarFile = new File(tarFilePath);
+ tarFile.deleteOnExit();
+ Response.ResponseBuilder builder = Response.ok();
+ builder.entity((StreamingOutput) output -> {
+ try {
+ Files.copy(tarFile.toPath(), output);
+ } finally {
+ FileUtils.deleteQuietly(tarFile);
+ }
+ });
+ builder.header(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename="
+ tarFile.getName());
+ builder.header(HttpHeaders.CONTENT_LENGTH, tarFile.length());
+ return builder.build();
+ } finally {
+ tableDataManager.releaseSegment(segmentDataManager);
+ }
+ }
}
diff --git
a/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
b/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
index 08ab8c3..f4b5b1e 100644
---
a/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
+++
b/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.Map;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.WebTarget;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.metrics.ServerMetrics;
@@ -43,6 +44,7 @@ import
org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl
import org.apache.pinot.segments.v1.creator.SegmentTestUtils;
import org.apache.pinot.server.starter.ServerInstance;
import org.apache.pinot.server.starter.helix.AdminApiApplication;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -58,8 +60,8 @@ public abstract class BaseResourceTest {
protected static final String TABLE_NAME = "testTable";
private final Map<String, TableDataManager> _tableDataManagerMap = new
HashMap<>();
- protected final List<ImmutableSegment> _indexSegments = new ArrayList<>();
-
+ protected final List<ImmutableSegment> _realtimeIndexSegments = new
ArrayList<>();
+ protected final List<ImmutableSegment> _offlineIndexSegments = new
ArrayList<>();
private File _avroFile;
private AdminApiApplication _adminApiApplication;
protected WebTarget _webTarget;
@@ -83,10 +85,16 @@ public abstract class BaseResourceTest {
// Mock the server instance
ServerInstance serverInstance = mock(ServerInstance.class);
when(serverInstance.getInstanceDataManager()).thenReturn(instanceDataManager);
+
when(serverInstance.getInstanceDataManager().getSegmentFileDirectory()).thenReturn(FileUtils.getTempDirectoryPath());
+
+ // Add the default tables and segments.
+ String realtimeTableName =
TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME);
+ String offlineTableName =
TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME);
- // Add the default table and segment
- addTable(TABLE_NAME);
- setUpSegment("default");
+ addTable(realtimeTableName);
+ addTable(offlineTableName);
+ setUpSegment(realtimeTableName, "default", _realtimeIndexSegments);
+ setUpSegment(offlineTableName, "default", _offlineIndexSegments);
_adminApiApplication = new AdminApiApplication(serverInstance);
_adminApiApplication.start(CommonConstants.Server.DEFAULT_ADMIN_API_PORT);
@@ -96,46 +104,50 @@ public abstract class BaseResourceTest {
@AfterClass
public void tearDown() {
_adminApiApplication.stop();
- for (ImmutableSegment immutableSegment : _indexSegments) {
+ for (ImmutableSegment immutableSegment : _realtimeIndexSegments) {
+ immutableSegment.destroy();
+ }
+ for (ImmutableSegment immutableSegment : _offlineIndexSegments) {
immutableSegment.destroy();
}
FileUtils.deleteQuietly(INDEX_DIR);
}
- protected List<ImmutableSegment> setUpSegments(int numSegments)
+ protected List<ImmutableSegment> setUpSegments(String tableNameWithType, int
numSegments, List<ImmutableSegment> segments)
throws Exception {
List<ImmutableSegment> immutableSegments = new ArrayList<>();
for (int i = 0; i < numSegments; i++) {
-
immutableSegments.add(setUpSegment(Integer.toString(_indexSegments.size())));
+ immutableSegments.add(setUpSegment(tableNameWithType,
Integer.toString(_realtimeIndexSegments.size()), segments));
}
return immutableSegments;
}
- protected ImmutableSegment setUpSegment(String segmentNamePostfix)
+ protected ImmutableSegment setUpSegment(String tableNameWithType, String
segmentNamePostfix, List<ImmutableSegment> segments)
throws Exception {
SegmentGeneratorConfig config =
- SegmentTestUtils.getSegmentGeneratorConfigWithoutTimeColumn(_avroFile,
INDEX_DIR, TABLE_NAME);
+ SegmentTestUtils.getSegmentGeneratorConfigWithoutTimeColumn(_avroFile,
INDEX_DIR, tableNameWithType);
config.setSegmentNamePostfix(segmentNamePostfix);
SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
driver.init(config);
driver.build();
ImmutableSegment immutableSegment =
ImmutableSegmentLoader.load(new File(INDEX_DIR,
driver.getSegmentName()), ReadMode.mmap);
- _indexSegments.add(immutableSegment);
- _tableDataManagerMap.get(TABLE_NAME).addSegment(immutableSegment);
+ segments.add(immutableSegment);
+ _tableDataManagerMap.get(tableNameWithType).addSegment(immutableSegment);
return immutableSegment;
}
@SuppressWarnings("unchecked")
- protected void addTable(String tableName) {
+ protected void addTable(String tableNameWithType) {
TableDataManagerConfig tableDataManagerConfig =
mock(TableDataManagerConfig.class);
-
when(tableDataManagerConfig.getTableDataManagerType()).thenReturn("OFFLINE");
- when(tableDataManagerConfig.getTableName()).thenReturn(tableName);
-
when(tableDataManagerConfig.getDataDir()).thenReturn(FileUtils.getTempDirectoryPath());
+ when(tableDataManagerConfig.getTableDataManagerType())
+
.thenReturn(TableNameBuilder.getTableTypeFromTableName(tableNameWithType).name());
+ when(tableDataManagerConfig.getTableName()).thenReturn(tableNameWithType);
+
when(tableDataManagerConfig.getDataDir()).thenReturn(INDEX_DIR.getAbsolutePath());
TableDataManager tableDataManager = TableDataManagerProvider
.getTableDataManager(tableDataManagerConfig, "testInstance",
mock(ZkHelixPropertyStore.class),
mock(ServerMetrics.class));
tableDataManager.start();
- _tableDataManagerMap.put(tableName, tableDataManager);
+ _tableDataManagerMap.put(tableNameWithType, tableDataManager);
}
}
diff --git
a/pinot-server/src/test/java/org/apache/pinot/server/api/TableSizeResourceTest.java
b/pinot-server/src/test/java/org/apache/pinot/server/api/TableSizeResourceTest.java
index f30b291..790d120 100644
---
a/pinot-server/src/test/java/org/apache/pinot/server/api/TableSizeResourceTest.java
+++
b/pinot-server/src/test/java/org/apache/pinot/server/api/TableSizeResourceTest.java
@@ -21,13 +21,12 @@ package org.apache.pinot.server.api;
import javax.ws.rs.core.Response;
import org.apache.pinot.common.restlet.resources.TableSizeInfo;
import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.testng.Assert;
import org.testng.annotations.Test;
public class TableSizeResourceTest extends BaseResourceTest {
- private static final String TABLE_SIZE_PATH = "/tables/" + TABLE_NAME +
"/size";
-
@Test
public void testTableSizeNotFound() {
Response response =
_webTarget.path("table/unknownTable/size").request().get(Response.class);
@@ -36,38 +35,54 @@ public class TableSizeResourceTest extends BaseResourceTest
{
@Test
public void testTableSizeDetailed() {
- TableSizeInfo tableSizeInfo =
_webTarget.path(TABLE_SIZE_PATH).request().get(TableSizeInfo.class);
- ImmutableSegment defaultSegment = _indexSegments.get(0);
+
verifyTableSizeDetailedImpl(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME),
_realtimeIndexSegments.get(0));
+
verifyTableSizeDetailedImpl(TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME),
_offlineIndexSegments.get(0));
+ }
- Assert.assertEquals(tableSizeInfo.tableName, TABLE_NAME);
- Assert.assertEquals(tableSizeInfo.diskSizeInBytes,
defaultSegment.getSegmentSizeBytes());
+ private void verifyTableSizeDetailedImpl(String expectedTableName,
ImmutableSegment segment) {
+ String path = "/tables/" + expectedTableName + "/size";
+ TableSizeInfo tableSizeInfo =
_webTarget.path(path).request().get(TableSizeInfo.class);
+
+ Assert.assertEquals(tableSizeInfo.tableName, expectedTableName);
+ Assert.assertEquals(tableSizeInfo.diskSizeInBytes,
segment.getSegmentSizeBytes());
Assert.assertEquals(tableSizeInfo.segments.size(), 1);
- Assert.assertEquals(tableSizeInfo.segments.get(0).segmentName,
defaultSegment.getSegmentName());
- Assert.assertEquals(tableSizeInfo.segments.get(0).diskSizeInBytes,
defaultSegment.getSegmentSizeBytes());
- Assert.assertEquals(tableSizeInfo.diskSizeInBytes,
defaultSegment.getSegmentSizeBytes());
+ Assert.assertEquals(tableSizeInfo.segments.get(0).segmentName,
segment.getSegmentName());
+ Assert.assertEquals(tableSizeInfo.segments.get(0).diskSizeInBytes,
segment.getSegmentSizeBytes());
+ Assert.assertEquals(tableSizeInfo.diskSizeInBytes,
segment.getSegmentSizeBytes());
}
@Test
public void testTableSizeNoDetails() {
+
verifyTableSizeNoDetailsImpl(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME),
+ _realtimeIndexSegments.get(0));
+
verifyTableSizeNoDetailsImpl(TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME),
_offlineIndexSegments.get(0));
+ }
+
+ private void verifyTableSizeNoDetailsImpl(String expectedTableName,
ImmutableSegment segment) {
+ String path = "/tables/" + expectedTableName + "/size";
TableSizeInfo tableSizeInfo =
- _webTarget.path(TABLE_SIZE_PATH).queryParam("detailed",
"false").request().get(TableSizeInfo.class);
- ImmutableSegment defaultSegment = _indexSegments.get(0);
+ _webTarget.path(path).queryParam("detailed",
"false").request().get(TableSizeInfo.class);
- Assert.assertEquals(tableSizeInfo.tableName, TABLE_NAME);
- Assert.assertEquals(tableSizeInfo.diskSizeInBytes,
defaultSegment.getSegmentSizeBytes());
+ Assert.assertEquals(tableSizeInfo.tableName, expectedTableName);
+ Assert.assertEquals(tableSizeInfo.diskSizeInBytes,
segment.getSegmentSizeBytes());
Assert.assertEquals(tableSizeInfo.segments.size(), 0);
}
@Test
public void testTableSizeOld() {
- TableSizeInfo tableSizeInfo = _webTarget.path("/table/" + TABLE_NAME +
"/size").request().get(TableSizeInfo.class);
- ImmutableSegment defaultSegment = _indexSegments.get(0);
+
verifyTableSizeOldImpl(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME),
_realtimeIndexSegments.get(0));
+
verifyTableSizeOldImpl(TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME),
_offlineIndexSegments.get(0));
+ }
+
+ private void verifyTableSizeOldImpl(String expectedTableName,
ImmutableSegment segment) {
+ String path = "/table/" + expectedTableName + "/size";
+ TableSizeInfo tableSizeInfo =
_webTarget.path(path).request().get(TableSizeInfo.class);
- Assert.assertEquals(tableSizeInfo.tableName, TABLE_NAME);
- Assert.assertEquals(tableSizeInfo.diskSizeInBytes,
defaultSegment.getSegmentSizeBytes());
+ Assert.assertEquals(tableSizeInfo.tableName, expectedTableName);
+ Assert.assertEquals(tableSizeInfo.diskSizeInBytes,
segment.getSegmentSizeBytes());
Assert.assertEquals(tableSizeInfo.segments.size(), 1);
- Assert.assertEquals(tableSizeInfo.segments.get(0).segmentName,
defaultSegment.getSegmentName());
- Assert.assertEquals(tableSizeInfo.segments.get(0).diskSizeInBytes,
defaultSegment.getSegmentSizeBytes());
- Assert.assertEquals(tableSizeInfo.diskSizeInBytes,
defaultSegment.getSegmentSizeBytes());
+ Assert.assertEquals(tableSizeInfo.segments.get(0).segmentName,
segment.getSegmentName());
+ Assert.assertEquals(tableSizeInfo.segments.get(0).diskSizeInBytes,
segment.getSegmentSizeBytes());
+ Assert.assertEquals(tableSizeInfo.diskSizeInBytes,
segment.getSegmentSizeBytes());
}
}
diff --git
a/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
b/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
index 24d36eb..9363b5a 100644
---
a/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
+++
b/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
@@ -19,19 +19,33 @@
package org.apache.pinot.server.api;
import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.util.List;
import javax.ws.rs.core.Response;
+import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.restlet.resources.TableSegments;
import org.apache.pinot.common.restlet.resources.TablesList;
+import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.core.indexsegment.IndexSegment;
import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
+import org.apache.pinot.core.segment.creator.impl.V1Constants;
import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
-import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;
public class TablesResourceTest extends BaseResourceTest {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TablesResourceTest.class);
@Test
public void getTables()
@@ -45,10 +59,11 @@ public class TablesResourceTest extends BaseResourceTest {
Assert.assertNotNull(tablesList);
List<String> tables = tablesList.getTables();
Assert.assertNotNull(tables);
- Assert.assertEquals(tables.size(), 1);
- Assert.assertEquals(tables.get(0), TABLE_NAME);
+ Assert.assertEquals(tables.size(), 2);
+ Assert.assertEquals(tables.get(0),
TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME));
+ Assert.assertEquals(tables.get(1),
TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME));
- String secondTable = "secondTable";
+ String secondTable = "secondTable_REALTIME";
addTable(secondTable);
response = _webTarget.path(tablesPath).request().get(Response.class);
responseBody = response.readEntity(String.class);
@@ -57,25 +72,27 @@ public class TablesResourceTest extends BaseResourceTest {
Assert.assertNotNull(tablesList);
tables = tablesList.getTables();
Assert.assertNotNull(tables);
- Assert.assertEquals(tables.size(), 2);
- Assert.assertTrue(tables.contains(TABLE_NAME));
+ Assert.assertEquals(tables.size(), 3);
+
Assert.assertTrue(tables.contains(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME)));
Assert.assertTrue(tables.contains(secondTable));
+
Assert.assertTrue(tables.contains(TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME)));
}
@Test
public void getSegments()
throws Exception {
- String segmentsPath = "/tables/" + TABLE_NAME + "/segments";
- IndexSegment defaultSegment = _indexSegments.get(0);
+ String segmentsPath = "/tables/" +
TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME) + "/segments";
+ IndexSegment defaultSegment = _realtimeIndexSegments.get(0);
TableSegments tableSegments =
_webTarget.path(segmentsPath).request().get(TableSegments.class);
Assert.assertNotNull(tableSegments);
List<String> segmentNames = tableSegments.getSegments();
Assert.assertNotNull(segmentNames);
Assert.assertEquals(segmentNames.size(), 1);
- Assert.assertEquals(segmentNames.get(0),
_indexSegments.get(0).getSegmentName());
+ Assert.assertEquals(segmentNames.get(0),
_realtimeIndexSegments.get(0).getSegmentName());
- IndexSegment secondSegment = setUpSegment("0");
+ IndexSegment secondSegment =
+ setUpSegment(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME),
"0", _realtimeIndexSegments);
tableSegments =
_webTarget.path(segmentsPath).request().get(TableSegments.class);
Assert.assertNotNull(tableSegments);
segmentNames = tableSegments.getSegments();
@@ -93,8 +110,10 @@ public class TablesResourceTest extends BaseResourceTest {
@Test
public void testSegmentMetadata()
throws Exception {
- IndexSegment defaultSegment = _indexSegments.get(0);
- String segmentMetadataPath = "/tables/" + TABLE_NAME + "/segments/" +
defaultSegment.getSegmentName() + "/metadata";
+ IndexSegment defaultSegment = _realtimeIndexSegments.get(0);
+ String segmentMetadataPath =
+ "/tables/" + TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME) +
"/segments/" + defaultSegment
+ .getSegmentName() + "/metadata";
JsonNode jsonResponse =
JsonUtils.stringToJsonNode(_webTarget.path(segmentMetadataPath).request().get(String.class));
@@ -126,17 +145,20 @@ public class TablesResourceTest extends BaseResourceTest {
.get(Response.class);
Assert.assertEquals(response.getStatus(),
Response.Status.NOT_FOUND.getStatusCode());
- response = _webTarget.path("/tables/" + TABLE_NAME +
"/segments/UNKNOWN_SEGMENT").request().get(Response.class);
+ response = _webTarget
+ .path("/tables/" +
TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME) +
"/segments/UNKNOWN_SEGMENT")
+ .request().get(Response.class);
Assert.assertEquals(response.getStatus(),
Response.Status.NOT_FOUND.getStatusCode());
}
@Test
public void testSegmentCrcMetadata()
throws Exception {
- String segmentsCrcPath = "/tables/" + TABLE_NAME + "/segments/crc";
+ String segmentsCrcPath = "/tables/" +
TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME) + "/segments/crc";
// Upload segments
- List<ImmutableSegment> immutableSegments = setUpSegments(2);
+ List<ImmutableSegment> immutableSegments =
+ setUpSegments(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME),
2, _realtimeIndexSegments);
// Trigger crc api to fetch crc information
String response =
_webTarget.path(segmentsCrcPath).request().get(String.class);
@@ -149,4 +171,66 @@ public class TablesResourceTest extends BaseResourceTest {
Assert.assertEquals(segmentsCrc.get(segmentName).asText(), crc);
}
}
+
+ @Test
+ public void testDownloadSegments()
+ throws Exception {
+ // Verify the content of the downloaded segment from a realtime table.
+
Assert.assertTrue(downLoadAndVerifySegmentContent(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME),
+ _realtimeIndexSegments.get(0)));
+ // Verify the content of the downloaded segment from an offline table.
+
Assert.assertTrue(downLoadAndVerifySegmentContent(TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME),
+ _offlineIndexSegments.get(0)));
+
+ // Verify non-existent table and segment download return NOT_FOUND status.
+ Response response =
_webTarget.path("/tables/UNKNOWN_REALTIME/segments/segmentname").request().get(Response.class);
+ Assert.assertEquals(response.getStatus(),
Response.Status.NOT_FOUND.getStatusCode());
+
+ response = _webTarget
+ .path("/tables/" +
TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME) +
"/segments/UNKNOWN_SEGMENT")
+ .request().get(Response.class);
+ Assert.assertEquals(response.getStatus(),
Response.Status.NOT_FOUND.getStatusCode());
+ }
+
+ // Verify metadata file from segments.
+ private boolean downLoadAndVerifySegmentContent(String tableNameWithType,
IndexSegment segment) {
+ String segmentPath = "/segments/" + tableNameWithType + "/" +
segment.getSegmentName();
+
+ // Download the segment and save to a temp local file.
+ Response response =
_webTarget.path(segmentPath).request().get(Response.class);
+ Assert.assertEquals(response.getStatus(),
Response.Status.OK.getStatusCode());
+ File segmentFile = response.readEntity(File.class);
+
+ File tempMetadataDir = new File(FileUtils.getTempDirectory(),
"segment_metadata");
+ try (// Extract metadata.properties
+ InputStream metadataPropertiesInputStream = TarGzCompressionUtils
+ .unTarOneFile(new FileInputStream(segmentFile),
V1Constants.MetadataKeys.METADATA_FILE_NAME);
+ // Extract creation.meta
+ InputStream creationMetaInputStream = TarGzCompressionUtils
+ .unTarOneFile(new FileInputStream(segmentFile),
V1Constants.SEGMENT_CREATION_META)) {
+ Preconditions
+ .checkState(tempMetadataDir.mkdirs(), "Failed to create directory:
%s", tempMetadataDir.getAbsolutePath());
+
+ Preconditions.checkNotNull(metadataPropertiesInputStream, "%s does not
exist",
+ V1Constants.MetadataKeys.METADATA_FILE_NAME);
+ Path metadataPropertiesPath = FileSystems.getDefault()
+ .getPath(tempMetadataDir.getAbsolutePath(),
V1Constants.MetadataKeys.METADATA_FILE_NAME);
+ Files.copy(metadataPropertiesInputStream, metadataPropertiesPath);
+
+ Preconditions.checkNotNull(creationMetaInputStream, "%s does not exist",
V1Constants.SEGMENT_CREATION_META);
+ Path creationMetaPath =
+ FileSystems.getDefault().getPath(tempMetadataDir.getAbsolutePath(),
V1Constants.SEGMENT_CREATION_META);
+ Files.copy(creationMetaInputStream, creationMetaPath);
+ // Load segment metadata
+ SegmentMetadataImpl metadata = new SegmentMetadataImpl(tempMetadataDir);
+
+ Assert.assertEquals(tableNameWithType, metadata.getTableName());
+ return true;
+ } catch (Exception e) {
+ LOGGER.error("Failure in segment extraction and verification:", e);
+ return false;
+ } finally {
+ FileUtils.deleteQuietly(tempMetadataDir);
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]