This is an automated email from the ASF dual-hosted git repository.
tingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 6c6d08e add uploadLLCSegment endpoint in TableResource (#6653)
6c6d08e is described below
commit 6c6d08ebbc0439a4c1359c95480c6fa767033f20
Author: Chang <[email protected]>
AuthorDate: Wed Mar 31 09:10:13 2021 -0700
add uploadLLCSegment endpoint in TableResource (#6653)
* add uploadLLCSegment endpoint in TableResource
* add upload to rest path
* use single class import
* update comment; throw exception when upload fails
* add constant for config: segment.store.uri
* add comment for getSegmentUploader
---
.../apache/pinot/common/utils/CommonConstants.java | 3 +
.../core/data/manager/InstanceDataManager.java | 6 ++
.../manager/config/InstanceDataManagerConfig.java | 2 +
.../segment/index/loader/IndexLoadingConfig.java | 5 +-
.../pinot/server/api/resources/TablesResource.java | 70 ++++++++++++++++++++++
.../starter/helix/HelixInstanceDataManager.java | 10 ++++
.../helix/HelixInstanceDataManagerConfig.java | 7 +++
.../apache/pinot/server/api/BaseResourceTest.java | 28 ++++++---
.../pinot/server/api/TablesResourceTest.java | 32 +++++++++-
9 files changed, 153 insertions(+), 10 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
index b9beb55..8ed12fe 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
@@ -307,6 +307,9 @@ public class CommonConstants {
public static final String SERVER_TLS_PREFIX = "pinot.server.tls";
public static final String SERVER_NETTYTLS_PREFIX =
"pinot.server.nettytls";
+ // The complete config key is pinot.server.instance.segment.store.uri
+ public static final String CONFIG_OF_SEGMENT_STORE_URI =
"segment.store.uri";
+
public static class SegmentCompletionProtocol {
public static final String PREFIX_OF_CONFIG_OF_SEGMENT_UPLOADER =
"pinot.server.segment.uploader";
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
index a28e069..f43ff63 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
@@ -30,6 +30,7 @@ import org.apache.helix.HelixManager;
import org.apache.helix.ZNRecord;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.core.data.manager.realtime.SegmentUploader;
import org.apache.pinot.core.segment.index.metadata.SegmentMetadata;
import org.apache.pinot.spi.env.PinotConfiguration;
@@ -132,4 +133,9 @@ public interface InstanceDataManager {
* Returns the Helix property store.
*/
ZkHelixPropertyStore<ZNRecord> getPropertyStore();
+
+ /**
+ * Returns the segment uploader, which uploads a llc segment to the
destination place and returns the url of uploaded segment file. Servers utilize
segment uploader to upload llc segment to segment store.
+ */
+ SegmentUploader getSegmentUploader();
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/config/InstanceDataManagerConfig.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/config/InstanceDataManagerConfig.java
index cf2aeb9..c03fb20 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/config/InstanceDataManagerConfig.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/config/InstanceDataManagerConfig.java
@@ -35,6 +35,8 @@ public interface InstanceDataManagerConfig {
String getInstanceBootstrapSegmentDir();
+ String getSegmentStoreUri();
+
ReadMode getReadMode();
String getSegmentFormatVersion();
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
index f3a2644..0436eae 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
@@ -37,13 +37,14 @@ import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
import org.apache.pinot.spi.config.table.TableConfig;
+import static
org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_SEGMENT_STORE_URI;
+
/**
* Table level index loading config.
*/
public class IndexLoadingConfig {
private static final int DEFAULT_REALTIME_AVG_MULTI_VALUE_COUNT = 2;
- private static final String SEGMENT_STORE_URI = "segment.store.uri";
private ReadMode _readMode = ReadMode.DEFAULT_MODE;
private List<String> _sortedColumns = Collections.emptyList();
@@ -235,7 +236,7 @@ public class IndexLoadingConfig {
_realtimeAvgMultiValueCount = Integer.valueOf(avgMultiValueCount);
}
_enableSplitCommitEndWithMetadata =
instanceDataManagerConfig.isEnableSplitCommitEndWithMetadata();
- _segmentStoreURI =
instanceDataManagerConfig.getConfig().getProperty(SEGMENT_STORE_URI);
+ _segmentStoreURI =
instanceDataManagerConfig.getConfig().getProperty(CONFIG_OF_SEGMENT_STORE_URI);
}
/**
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
index e5d533c..174d687 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
@@ -24,6 +24,7 @@ import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import java.io.File;
+import java.net.URI;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.HashMap;
@@ -34,6 +35,7 @@ import javax.inject.Inject;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.Encoded;
import javax.ws.rs.GET;
+import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
@@ -49,11 +51,13 @@ import
org.apache.pinot.common.restlet.resources.SegmentConsumerInfo;
import org.apache.pinot.common.restlet.resources.ResourceUtils;
import org.apache.pinot.common.restlet.resources.TableSegments;
import org.apache.pinot.common.restlet.resources.TablesList;
+import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.SegmentDataManager;
import org.apache.pinot.core.data.manager.TableDataManager;
import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
+import org.apache.pinot.core.data.manager.realtime.SegmentUploader;
import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.server.api.access.AccessControl;
import org.apache.pinot.server.api.access.AccessControlFactory;
@@ -69,6 +73,7 @@ import org.slf4j.LoggerFactory;
public class TablesResource {
private static final Logger LOGGER =
LoggerFactory.getLogger(TablesResource.class);
private static final String PEER_SEGMENT_DOWNLOAD_DIR =
"peerSegmentDownloadDir";
+ private static final String SEGMENT_UPLOAD_DIR = "segmentUploadDir";
@Inject
private ServerInstance _serverInstance;
@@ -244,6 +249,71 @@ public class TablesResource {
}
}
+ /**
+ * Upload a low level consumer segment to segment store and return the
segment download url. This endpoint is used when segment store copy is
unavailable for committed low level consumer segments.
+ * Please note that invocation of this endpoint may cause query performance
to suffer, since we tar up the segment to upload it.
+ * @see <a
href="https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion#BypassingdeepstorerequirementforRealtimesegmentcompletion-Failurecasesandhandling">By-passing
deep-store requirement for Realtime segment completion:Failure cases and
handling</a>
+ */
+ @POST
+ @Path("/segments/{realtimeTableName}/{segmentName}/upload")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Upload a low level consumer segment to segment store
and return the segment download url", notes = "Upload a low level consumer
segment to segment store and return the segment download url")
+ @ApiResponses(value = {@ApiResponse(code = 200, message = "Success"),
@ApiResponse(code = 500, message = "Internal server error", response =
ErrorInfo.class), @ApiResponse(code = 404, message = "Table or segment not
found", response = ErrorInfo.class), @ApiResponse(code = 400, message = "Bad
request", response = ErrorInfo.class)})
+ public String uploadLLCSegment(
+ @ApiParam(value = "Name of the REALTIME table", required = true)
@PathParam("realtimeTableName") String realtimeTableName,
+ @ApiParam(value = "Name of the segment", required = true)
@PathParam("segmentName") String segmentName) throws Exception {
+ LOGGER.info("Received a request to upload low level consumer segment {}
for table {}", segmentName, realtimeTableName);
+
+ // Check it's realtime table
+ TableType tableType =
TableNameBuilder.getTableTypeFromTableName(realtimeTableName);
+ if (TableType.OFFLINE == tableType) {
+ throw new WebApplicationException(
+ String.format("Cannot upload low level consumer segment for
OFFLINE table: %s", realtimeTableName),
+ Response.Status.BAD_REQUEST);
+ }
+
+ // Check the segment is low level consumer segment
+ if (!LLCSegmentName.isLowLevelConsumerSegmentName(segmentName)) {
+ throw new WebApplicationException(
+ String.format("Segment %s is not a low level consumer segment",
segmentName),
+ Response.Status.BAD_REQUEST);
+ }
+
+ String tableNameWithType =
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(realtimeTableName);
+ TableDataManager tableDataManager =
checkGetTableDataManager(tableNameWithType);
+ SegmentDataManager segmentDataManager =
tableDataManager.acquireSegment(segmentName);
+ if (segmentDataManager == null) {
+ throw new WebApplicationException(
+ String.format("Table %s segment %s does not exist",
realtimeTableName, segmentName),
+ Response.Status.NOT_FOUND);
+ }
+
+ File segmentTarFile = null;
+ try {
+ // Create the tar.gz segment file in the server's segmentTarUploadDir
folder with a unique file name.
+ File segmentTarUploadDir =
+ new
File(_serverInstance.getInstanceDataManager().getSegmentFileDirectory(),
SEGMENT_UPLOAD_DIR);
+ segmentTarUploadDir.mkdir();
+
+ segmentTarFile = new File(segmentTarUploadDir, tableNameWithType + "_" +
segmentName + "_" + UUID.randomUUID()
+ + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+ TarGzCompressionUtils.createTarGzFile(new
File(tableDataManager.getTableDataDir(), segmentName), segmentTarFile);
+
+ // Use segment uploader to upload the segment tar file to segment store
and return the segment download url.
+ SegmentUploader segmentUploader =
_serverInstance.getInstanceDataManager().getSegmentUploader();
+ URI segmentDownloadUrl = segmentUploader.uploadSegment(segmentTarFile,
new LLCSegmentName(segmentName));
+ if (segmentDownloadUrl == null) {
+ throw new WebApplicationException(
+ String.format("Failed to upload table %s segment %s to segment
store", realtimeTableName, segmentName),
+ Response.Status.INTERNAL_SERVER_ERROR);
+ }
+ return segmentDownloadUrl.getPath();
+ } finally {
+ FileUtils.deleteQuietly(segmentTarFile);
+ tableDataManager.releaseSegment(segmentDataManager);
+ }
+ }
+
@GET
@Path("tables/{realtimeTableName}/consumingSegmentsInfo")
@Produces(MediaType.APPLICATION_JSON)
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index 7849793..ebfa53a 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -42,6 +42,8 @@ import org.apache.pinot.core.data.manager.TableDataManager;
import org.apache.pinot.core.data.manager.config.TableDataManagerConfig;
import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider;
import
org.apache.pinot.core.data.manager.realtime.SegmentBuildTimeLeaseExtender;
+import org.apache.pinot.core.data.manager.realtime.PinotFSSegmentUploader;
+import org.apache.pinot.core.data.manager.realtime.SegmentUploader;
import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
import org.apache.pinot.core.indexsegment.mutable.MutableSegmentImpl;
@@ -72,6 +74,7 @@ public class HelixInstanceDataManager implements
InstanceDataManager {
private ServerMetrics _serverMetrics;
private ZkHelixPropertyStore<ZNRecord> _propertyStore;
private String _authToken;
+ private SegmentUploader _segmentUploader;
@Override
public synchronized void init(PinotConfiguration config, HelixManager
helixManager, ServerMetrics serverMetrics)
@@ -84,6 +87,8 @@ public class HelixInstanceDataManager implements
InstanceDataManager {
_helixManager = helixManager;
_serverMetrics = serverMetrics;
_authToken =
config.getProperty(CommonConstants.Server.CONFIG_OF_AUTH_TOKEN);
+ _segmentUploader = new
PinotFSSegmentUploader(_instanceDataManagerConfig.getSegmentStoreUri(),
+ PinotFSSegmentUploader.DEFAULT_SEGMENT_UPLOAD_TIMEOUT_MILLIS);
File instanceDataDir = new
File(_instanceDataManagerConfig.getInstanceDataDir());
if (!instanceDataDir.exists()) {
@@ -355,4 +360,9 @@ public class HelixInstanceDataManager implements
InstanceDataManager {
public ZkHelixPropertyStore<ZNRecord> getPropertyStore() {
return _propertyStore;
}
+
+ @Override
+ public SegmentUploader getSegmentUploader() {
+ return _segmentUploader;
+ }
}
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
index 0e4f60f..ac09b77 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
@@ -29,6 +29,8 @@ import org.apache.pinot.spi.env.PinotConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static
org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_SEGMENT_STORE_URI;
+
/**
* The config used for HelixInstanceDataManager.
@@ -150,6 +152,11 @@ public class HelixInstanceDataManagerConfig implements
InstanceDataManagerConfig
}
@Override
+ public String getSegmentStoreUri() {
+ return
_instanceDataManagerConfiguration.getProperty(CONFIG_OF_SEGMENT_STORE_URI);
+ }
+
+ @Override
public ReadMode getReadMode() {
return
ReadMode.valueOf(_instanceDataManagerConfiguration.getProperty(READ_MODE));
}
diff --git
a/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
b/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
index f3af60b..60c04fc 100644
---
a/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
+++
b/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
@@ -19,6 +19,7 @@
package org.apache.pinot.server.api;
import java.io.File;
+import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
@@ -30,15 +31,16 @@ import javax.ws.rs.client.WebTarget;
import org.apache.commons.io.FileUtils;
import org.apache.helix.HelixManager;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
-import org.apache.http.util.NetUtils;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.segment.ReadMode;
import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.NetUtil;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.TableDataManager;
import org.apache.pinot.core.data.manager.config.TableDataManagerConfig;
import org.apache.pinot.core.data.manager.offline.OfflineTableDataManager;
+import org.apache.pinot.core.data.manager.realtime.SegmentUploader;
import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
@@ -55,7 +57,9 @@ import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
-import static org.mockito.Mockito.anyString;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -64,6 +68,9 @@ public abstract class BaseResourceTest {
private static final String AVRO_DATA_PATH = "data/test_data-mv.avro";
private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(),
"BaseResourceTest");
protected static final String TABLE_NAME = "testTable";
+ protected static final String LLC_SEGMENT_NAME_FOR_UPLOAD_SUCCESS = new
LLCSegmentName(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME), 1, 0,
System.currentTimeMillis()).getSegmentName();
+ protected static final String LLC_SEGMENT_NAME_FOR_UPLOAD_FAILURE = new
LLCSegmentName(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME), 2, 0,
System.currentTimeMillis()).getSegmentName();
+ protected static final String SEGMENT_DOWNLOAD_URL =
"testSegmentDownloadUrl";
private final Map<String, TableDataManager> _tableDataManagerMap = new
HashMap<>();
protected final List<ImmutableSegment> _realtimeIndexSegments = new
ArrayList<>();
@@ -93,14 +100,21 @@ public abstract class BaseResourceTest {
when(serverInstance.getInstanceDataManager()).thenReturn(instanceDataManager);
when(serverInstance.getInstanceDataManager().getSegmentFileDirectory())
.thenReturn(FileUtils.getTempDirectoryPath());
+
+ // Mock the segment uploader
+ SegmentUploader segmentUploader = mock(SegmentUploader.class);
+ when(segmentUploader.uploadSegment(any(File.class), eq(new
LLCSegmentName(LLC_SEGMENT_NAME_FOR_UPLOAD_SUCCESS)))).thenReturn(new
URI(SEGMENT_DOWNLOAD_URL));
+ when(segmentUploader.uploadSegment(any(File.class), eq(new
LLCSegmentName(LLC_SEGMENT_NAME_FOR_UPLOAD_FAILURE)))).thenReturn(null);
+ when(instanceDataManager.getSegmentUploader()).thenReturn(segmentUploader);
+
// Add the default tables and segments.
String realtimeTableName =
TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME);
String offlineTableName =
TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME);
addTable(realtimeTableName);
addTable(offlineTableName);
- setUpSegment(realtimeTableName, "default", _realtimeIndexSegments);
- setUpSegment(offlineTableName, "default", _offlineIndexSegments);
+ setUpSegment(realtimeTableName, null, "default", _realtimeIndexSegments);
+ setUpSegment(offlineTableName, null, "default", _offlineIndexSegments);
_adminApiApplication = new AdminApiApplication(serverInstance, new
AllowAllAccessFactory());
_adminApiApplication.start(Collections.singletonList(new
ListenerConfig(CommonConstants.HTTP_PROTOCOL, "0.0.0.0",
@@ -127,16 +141,16 @@ public abstract class BaseResourceTest {
throws Exception {
List<ImmutableSegment> immutableSegments = new ArrayList<>();
for (int i = 0; i < numSegments; i++) {
- immutableSegments.add(setUpSegment(tableNameWithType,
Integer.toString(_realtimeIndexSegments.size()), segments));
+ immutableSegments.add(setUpSegment(tableNameWithType, null,
Integer.toString(_realtimeIndexSegments.size()), segments));
}
return immutableSegments;
}
- protected ImmutableSegment setUpSegment(String tableNameWithType, String
segmentNamePostfix,
- List<ImmutableSegment> segments)
+ protected ImmutableSegment setUpSegment(String tableNameWithType, String
segmentName, String segmentNamePostfix, List<ImmutableSegment> segments)
throws Exception {
SegmentGeneratorConfig config =
SegmentTestUtils.getSegmentGeneratorConfigWithoutTimeColumn(_avroFile,
INDEX_DIR, tableNameWithType);
+ config.setSegmentName(segmentName);
config.setSegmentNamePostfix(segmentNamePostfix);
SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
driver.init(config);
diff --git
a/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
b/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
index 44d58bf..b3863aa 100644
---
a/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
+++
b/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
@@ -83,7 +83,7 @@ public class TablesResourceTest extends BaseResourceTest {
Assert.assertEquals(segmentNames.get(0),
_realtimeIndexSegments.get(0).getSegmentName());
IndexSegment secondSegment =
- setUpSegment(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME),
"0", _realtimeIndexSegments);
+ setUpSegment(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME),
null, "0", _realtimeIndexSegments);
tableSegments =
_webTarget.path(segmentsPath).request().get(TableSegments.class);
Assert.assertNotNull(tableSegments);
segmentNames = tableSegments.getSegments();
@@ -212,6 +212,36 @@ public class TablesResourceTest extends BaseResourceTest {
}
@Test
+ public void testUploadSegments() throws Exception {
+ setUpSegment(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME),
LLC_SEGMENT_NAME_FOR_UPLOAD_SUCCESS, null, _realtimeIndexSegments);
+ setUpSegment(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME),
LLC_SEGMENT_NAME_FOR_UPLOAD_FAILURE, null, _realtimeIndexSegments);
+
+ // Verify segment uploading succeed.
+ Response response =
_webTarget.path(String.format("/segments/%s/%s/upload",
TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME),
+ LLC_SEGMENT_NAME_FOR_UPLOAD_SUCCESS)).request().post(null);
+ Assert.assertEquals(response.getStatus(),
Response.Status.OK.getStatusCode());
+ Assert.assertEquals(response.readEntity(String.class),
SEGMENT_DOWNLOAD_URL);
+
+ // Verify bad request: table type is offline
+ response = _webTarget.path(String.format("/segments/%s/%s/upload",
TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME),
_offlineIndexSegments.get(0).getSegmentName())).request().post(null);
+ Assert.assertEquals(response.getStatus(),
Response.Status.BAD_REQUEST.getStatusCode());
+
+ // Verify bad request: segment is not low level consumer segment
+ response = _webTarget.path(String.format("/segments/%s/%s/upload",
TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME),
_realtimeIndexSegments.get(0).getSegmentName())).request().post(null);
+ Assert.assertEquals(response.getStatus(),
Response.Status.BAD_REQUEST.getStatusCode());
+
+ // Verify non-existent segment uploading fail with NOT_FOUND status.
+ response = _webTarget.path(String.format("/segments/%s/%s_dummy/upload",
TABLE_NAME,
+ LLC_SEGMENT_NAME_FOR_UPLOAD_SUCCESS)).request().post(null);
+ Assert.assertEquals(response.getStatus(),
Response.Status.NOT_FOUND.getStatusCode());
+
+ // Verify fail to upload segment to segment store with internal server
error.
+ response = _webTarget.path(String.format("/segments/%s/%s/upload",
TABLE_NAME,
+ LLC_SEGMENT_NAME_FOR_UPLOAD_FAILURE)).request().post(null);
+ Assert.assertEquals(response.getStatus(),
Response.Status.INTERNAL_SERVER_ERROR.getStatusCode());
+ }
+
+ @Test
public void testOfflineTableSegmentMetadata()
throws Exception {
IndexSegment defaultSegment = _offlineIndexSegments.get(0);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]