This is an automated email from the ASF dual-hosted git repository. tingchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new 6c6d08e add uploadLLCSegment endpoint in TableResource (#6653) 6c6d08e is described below commit 6c6d08ebbc0439a4c1359c95480c6fa767033f20 Author: Chang <33030174+liuchang0...@users.noreply.github.com> AuthorDate: Wed Mar 31 09:10:13 2021 -0700 add uploadLLCSegment endpoint in TableResource (#6653) * add uploadLLCSegment endpoint in TableResource * add upload to rest path * use single class import * update comment; throw exception when upload fails * add constant for config: segment.store.uri * add comment for getSegmentUploader --- .../apache/pinot/common/utils/CommonConstants.java | 3 + .../core/data/manager/InstanceDataManager.java | 6 ++ .../manager/config/InstanceDataManagerConfig.java | 2 + .../segment/index/loader/IndexLoadingConfig.java | 5 +- .../pinot/server/api/resources/TablesResource.java | 70 ++++++++++++++++++++++ .../starter/helix/HelixInstanceDataManager.java | 10 ++++ .../helix/HelixInstanceDataManagerConfig.java | 7 +++ .../apache/pinot/server/api/BaseResourceTest.java | 28 ++++++--- .../pinot/server/api/TablesResourceTest.java | 32 +++++++++- 9 files changed, 153 insertions(+), 10 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java index b9beb55..8ed12fe 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java @@ -307,6 +307,9 @@ public class CommonConstants { public static final String SERVER_TLS_PREFIX = "pinot.server.tls"; public static final String SERVER_NETTYTLS_PREFIX = "pinot.server.nettytls"; + // The complete config key is pinot.server.instance.segment.store.uri + public static final String CONFIG_OF_SEGMENT_STORE_URI = "segment.store.uri"; + public static class SegmentCompletionProtocol { public static final String PREFIX_OF_CONFIG_OF_SEGMENT_UPLOADER = "pinot.server.segment.uploader"; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java index a28e069..f43ff63 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java @@ -30,6 +30,7 @@ import org.apache.helix.HelixManager; import org.apache.helix.ZNRecord; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.pinot.common.metrics.ServerMetrics; +import org.apache.pinot.core.data.manager.realtime.SegmentUploader; import org.apache.pinot.core.segment.index.metadata.SegmentMetadata; import org.apache.pinot.spi.env.PinotConfiguration; @@ -132,4 +133,9 @@ public interface InstanceDataManager { * Returns the Helix property store. */ ZkHelixPropertyStore<ZNRecord> getPropertyStore(); + + /** + * Returns the segment uploader, which uploads a llc segment to the destination place and returns the url of uploaded segment file. Servers utilize segment uploader to upload llc segment to segment store. + */ + SegmentUploader getSegmentUploader(); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/config/InstanceDataManagerConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/config/InstanceDataManagerConfig.java index cf2aeb9..c03fb20 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/config/InstanceDataManagerConfig.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/config/InstanceDataManagerConfig.java @@ -35,6 +35,8 @@ public interface InstanceDataManagerConfig { String getInstanceBootstrapSegmentDir(); + String getSegmentStoreUri(); + ReadMode getReadMode(); String getSegmentFormatVersion(); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java index f3a2644..0436eae 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java @@ -37,13 +37,14 @@ import org.apache.pinot.spi.config.table.IndexingConfig; import org.apache.pinot.spi.config.table.StarTreeIndexConfig; import org.apache.pinot.spi.config.table.TableConfig; +import static org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_SEGMENT_STORE_URI; + /** * Table level index loading config. */ public class IndexLoadingConfig { private static final int DEFAULT_REALTIME_AVG_MULTI_VALUE_COUNT = 2; - private static final String SEGMENT_STORE_URI = "segment.store.uri"; private ReadMode _readMode = ReadMode.DEFAULT_MODE; private List<String> _sortedColumns = Collections.emptyList(); @@ -235,7 +236,7 @@ public class IndexLoadingConfig { _realtimeAvgMultiValueCount = Integer.valueOf(avgMultiValueCount); } _enableSplitCommitEndWithMetadata = instanceDataManagerConfig.isEnableSplitCommitEndWithMetadata(); - _segmentStoreURI = instanceDataManagerConfig.getConfig().getProperty(SEGMENT_STORE_URI); + _segmentStoreURI = instanceDataManagerConfig.getConfig().getProperty(CONFIG_OF_SEGMENT_STORE_URI); } /** diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java index e5d533c..174d687 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java @@ -24,6 +24,7 @@ import io.swagger.annotations.ApiParam; import io.swagger.annotations.ApiResponse; import io.swagger.annotations.ApiResponses; import java.io.File; +import java.net.URI; import java.nio.file.Files; import java.util.ArrayList; import java.util.HashMap; @@ -34,6 +35,7 @@ import javax.inject.Inject; import javax.ws.rs.DefaultValue; import javax.ws.rs.Encoded; import javax.ws.rs.GET; +import javax.ws.rs.POST; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.Produces; @@ -49,11 +51,13 @@ import org.apache.pinot.common.restlet.resources.SegmentConsumerInfo; import org.apache.pinot.common.restlet.resources.ResourceUtils; import org.apache.pinot.common.restlet.resources.TableSegments; import org.apache.pinot.common.restlet.resources.TablesList; +import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.common.utils.TarGzCompressionUtils; import org.apache.pinot.core.data.manager.InstanceDataManager; import org.apache.pinot.core.data.manager.SegmentDataManager; import org.apache.pinot.core.data.manager.TableDataManager; import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager; +import org.apache.pinot.core.data.manager.realtime.SegmentUploader; import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl; import org.apache.pinot.server.api.access.AccessControl; import org.apache.pinot.server.api.access.AccessControlFactory; @@ -69,6 +73,7 @@ import org.slf4j.LoggerFactory; public class TablesResource { private static final Logger LOGGER = LoggerFactory.getLogger(TablesResource.class); private static final String PEER_SEGMENT_DOWNLOAD_DIR = "peerSegmentDownloadDir"; + private static final String SEGMENT_UPLOAD_DIR = "segmentUploadDir"; @Inject private ServerInstance _serverInstance; @@ -244,6 +249,71 @@ public class TablesResource { } } + /** + * Upload a low level consumer segment to segment store and return the segment download url. This endpoint is used when segment store copy is unavailable for committed low level consumer segments. + * Please note that invocation of this endpoint may cause query performance to suffer, since we tar up the segment to upload it. + * @see <a href="https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion#BypassingdeepstorerequirementforRealtimesegmentcompletion-Failurecasesandhandling">By-passing deep-store requirement for Realtime segment completion:Failure cases and handling</a> + */ + @POST + @Path("/segments/{realtimeTableName}/{segmentName}/upload") + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Upload a low level consumer segment to segment store and return the segment download url", notes = "Upload a low level consumer segment to segment store and return the segment download url") + @ApiResponses(value = {@ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, message = "Internal server error", response = ErrorInfo.class), @ApiResponse(code = 404, message = "Table or segment not found", response = ErrorInfo.class), @ApiResponse(code = 400, message = "Bad request", response = ErrorInfo.class)}) + public String uploadLLCSegment( + @ApiParam(value = "Name of the REALTIME table", required = true) @PathParam("realtimeTableName") String realtimeTableName, + @ApiParam(value = "Name of the segment", required = true) @PathParam("segmentName") String segmentName) throws Exception { + LOGGER.info("Received a request to upload low level consumer segment {} for table {}", segmentName, realtimeTableName); + + // Check it's realtime table + TableType tableType = TableNameBuilder.getTableTypeFromTableName(realtimeTableName); + if (TableType.OFFLINE == tableType) { + throw new WebApplicationException( + String.format("Cannot upload low level consumer segment for OFFLINE table: %s", realtimeTableName), + Response.Status.BAD_REQUEST); + } + + // Check the segment is low level consumer segment + if (!LLCSegmentName.isLowLevelConsumerSegmentName(segmentName)) { + throw new WebApplicationException( + String.format("Segment %s is not a low level consumer segment", segmentName), + Response.Status.BAD_REQUEST); + } + + String tableNameWithType = TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(realtimeTableName); + TableDataManager tableDataManager = checkGetTableDataManager(tableNameWithType); + SegmentDataManager segmentDataManager = tableDataManager.acquireSegment(segmentName); + if (segmentDataManager == null) { + throw new WebApplicationException( + String.format("Table %s segment %s does not exist", realtimeTableName, segmentName), + Response.Status.NOT_FOUND); + } + + File segmentTarFile = null; + try { + // Create the tar.gz segment file in the server's segmentTarUploadDir folder with a unique file name. + File segmentTarUploadDir = + new File(_serverInstance.getInstanceDataManager().getSegmentFileDirectory(), SEGMENT_UPLOAD_DIR); + segmentTarUploadDir.mkdir(); + + segmentTarFile = new File(segmentTarUploadDir, tableNameWithType + "_" + segmentName + "_" + UUID.randomUUID() + + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION); + TarGzCompressionUtils.createTarGzFile(new File(tableDataManager.getTableDataDir(), segmentName), segmentTarFile); + + // Use segment uploader to upload the segment tar file to segment store and return the segment download url. + SegmentUploader segmentUploader = _serverInstance.getInstanceDataManager().getSegmentUploader(); + URI segmentDownloadUrl = segmentUploader.uploadSegment(segmentTarFile, new LLCSegmentName(segmentName)); + if (segmentDownloadUrl == null) { + throw new WebApplicationException( + String.format("Failed to upload table %s segment %s to segment store", realtimeTableName, segmentName), + Response.Status.INTERNAL_SERVER_ERROR); + } + return segmentDownloadUrl.getPath(); + } finally { + FileUtils.deleteQuietly(segmentTarFile); + tableDataManager.releaseSegment(segmentDataManager); + } + } + @GET @Path("tables/{realtimeTableName}/consumingSegmentsInfo") @Produces(MediaType.APPLICATION_JSON) diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java index 7849793..ebfa53a 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java @@ -42,6 +42,8 @@ import org.apache.pinot.core.data.manager.TableDataManager; import org.apache.pinot.core.data.manager.config.TableDataManagerConfig; import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider; import org.apache.pinot.core.data.manager.realtime.SegmentBuildTimeLeaseExtender; +import org.apache.pinot.core.data.manager.realtime.PinotFSSegmentUploader; +import org.apache.pinot.core.data.manager.realtime.SegmentUploader; import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment; import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader; import org.apache.pinot.core.indexsegment.mutable.MutableSegmentImpl; @@ -72,6 +74,7 @@ public class HelixInstanceDataManager implements InstanceDataManager { private ServerMetrics _serverMetrics; private ZkHelixPropertyStore<ZNRecord> _propertyStore; private String _authToken; + private SegmentUploader _segmentUploader; @Override public synchronized void init(PinotConfiguration config, HelixManager helixManager, ServerMetrics serverMetrics) @@ -84,6 +87,8 @@ public class HelixInstanceDataManager implements InstanceDataManager { _helixManager = helixManager; _serverMetrics = serverMetrics; _authToken = config.getProperty(CommonConstants.Server.CONFIG_OF_AUTH_TOKEN); + _segmentUploader = new PinotFSSegmentUploader(_instanceDataManagerConfig.getSegmentStoreUri(), + PinotFSSegmentUploader.DEFAULT_SEGMENT_UPLOAD_TIMEOUT_MILLIS); File instanceDataDir = new File(_instanceDataManagerConfig.getInstanceDataDir()); if (!instanceDataDir.exists()) { @@ -355,4 +360,9 @@ public class HelixInstanceDataManager implements InstanceDataManager { public ZkHelixPropertyStore<ZNRecord> getPropertyStore() { return _propertyStore; } + + @Override + public SegmentUploader getSegmentUploader() { + return _segmentUploader; + } } diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java index 0e4f60f..ac09b77 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java @@ -29,6 +29,8 @@ import org.apache.pinot.spi.env.PinotConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_SEGMENT_STORE_URI; + /** * The config used for HelixInstanceDataManager. @@ -150,6 +152,11 @@ public class HelixInstanceDataManagerConfig implements InstanceDataManagerConfig } @Override + public String getSegmentStoreUri() { + return _instanceDataManagerConfiguration.getProperty(CONFIG_OF_SEGMENT_STORE_URI); + } + + @Override public ReadMode getReadMode() { return ReadMode.valueOf(_instanceDataManagerConfiguration.getProperty(READ_MODE)); } diff --git a/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java b/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java index f3af60b..60c04fc 100644 --- a/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java +++ b/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java @@ -19,6 +19,7 @@ package org.apache.pinot.server.api; import java.io.File; +import java.net.URI; import java.net.URL; import java.util.ArrayList; import java.util.Collections; @@ -30,15 +31,16 @@ import javax.ws.rs.client.WebTarget; import org.apache.commons.io.FileUtils; import org.apache.helix.HelixManager; import org.apache.helix.store.zk.ZkHelixPropertyStore; -import org.apache.http.util.NetUtils; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.segment.ReadMode; import org.apache.pinot.common.utils.CommonConstants; +import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.common.utils.NetUtil; import org.apache.pinot.core.data.manager.InstanceDataManager; import org.apache.pinot.core.data.manager.TableDataManager; import org.apache.pinot.core.data.manager.config.TableDataManagerConfig; import org.apache.pinot.core.data.manager.offline.OfflineTableDataManager; +import org.apache.pinot.core.data.manager.realtime.SegmentUploader; import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig; import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment; import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader; @@ -55,7 +57,9 @@ import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; -import static org.mockito.Mockito.anyString; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -64,6 +68,9 @@ public abstract class BaseResourceTest { private static final String AVRO_DATA_PATH = "data/test_data-mv.avro"; private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "BaseResourceTest"); protected static final String TABLE_NAME = "testTable"; + protected static final String LLC_SEGMENT_NAME_FOR_UPLOAD_SUCCESS = new LLCSegmentName(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME), 1, 0, System.currentTimeMillis()).getSegmentName(); + protected static final String LLC_SEGMENT_NAME_FOR_UPLOAD_FAILURE = new LLCSegmentName(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME), 2, 0, System.currentTimeMillis()).getSegmentName(); + protected static final String SEGMENT_DOWNLOAD_URL = "testSegmentDownloadUrl"; private final Map<String, TableDataManager> _tableDataManagerMap = new HashMap<>(); protected final List<ImmutableSegment> _realtimeIndexSegments = new ArrayList<>(); @@ -93,14 +100,21 @@ public abstract class BaseResourceTest { when(serverInstance.getInstanceDataManager()).thenReturn(instanceDataManager); when(serverInstance.getInstanceDataManager().getSegmentFileDirectory()) .thenReturn(FileUtils.getTempDirectoryPath()); + + // Mock the segment uploader + SegmentUploader segmentUploader = mock(SegmentUploader.class); + when(segmentUploader.uploadSegment(any(File.class), eq(new LLCSegmentName(LLC_SEGMENT_NAME_FOR_UPLOAD_SUCCESS)))).thenReturn(new URI(SEGMENT_DOWNLOAD_URL)); + when(segmentUploader.uploadSegment(any(File.class), eq(new LLCSegmentName(LLC_SEGMENT_NAME_FOR_UPLOAD_FAILURE)))).thenReturn(null); + when(instanceDataManager.getSegmentUploader()).thenReturn(segmentUploader); + // Add the default tables and segments. String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME); String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME); addTable(realtimeTableName); addTable(offlineTableName); - setUpSegment(realtimeTableName, "default", _realtimeIndexSegments); - setUpSegment(offlineTableName, "default", _offlineIndexSegments); + setUpSegment(realtimeTableName, null, "default", _realtimeIndexSegments); + setUpSegment(offlineTableName, null, "default", _offlineIndexSegments); _adminApiApplication = new AdminApiApplication(serverInstance, new AllowAllAccessFactory()); _adminApiApplication.start(Collections.singletonList(new ListenerConfig(CommonConstants.HTTP_PROTOCOL, "0.0.0.0", @@ -127,16 +141,16 @@ public abstract class BaseResourceTest { throws Exception { List<ImmutableSegment> immutableSegments = new ArrayList<>(); for (int i = 0; i < numSegments; i++) { - immutableSegments.add(setUpSegment(tableNameWithType, Integer.toString(_realtimeIndexSegments.size()), segments)); + immutableSegments.add(setUpSegment(tableNameWithType, null, Integer.toString(_realtimeIndexSegments.size()), segments)); } return immutableSegments; } - protected ImmutableSegment setUpSegment(String tableNameWithType, String segmentNamePostfix, - List<ImmutableSegment> segments) + protected ImmutableSegment setUpSegment(String tableNameWithType, String segmentName, String segmentNamePostfix, List<ImmutableSegment> segments) throws Exception { SegmentGeneratorConfig config = SegmentTestUtils.getSegmentGeneratorConfigWithoutTimeColumn(_avroFile, INDEX_DIR, tableNameWithType); + config.setSegmentName(segmentName); config.setSegmentNamePostfix(segmentNamePostfix); SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl(); driver.init(config); diff --git a/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java b/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java index 44d58bf..b3863aa 100644 --- a/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java +++ b/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java @@ -83,7 +83,7 @@ public class TablesResourceTest extends BaseResourceTest { Assert.assertEquals(segmentNames.get(0), _realtimeIndexSegments.get(0).getSegmentName()); IndexSegment secondSegment = - setUpSegment(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME), "0", _realtimeIndexSegments); + setUpSegment(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME), null, "0", _realtimeIndexSegments); tableSegments = _webTarget.path(segmentsPath).request().get(TableSegments.class); Assert.assertNotNull(tableSegments); segmentNames = tableSegments.getSegments(); @@ -212,6 +212,36 @@ public class TablesResourceTest extends BaseResourceTest { } @Test + public void testUploadSegments() throws Exception { + setUpSegment(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME), LLC_SEGMENT_NAME_FOR_UPLOAD_SUCCESS, null, _realtimeIndexSegments); + setUpSegment(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME), LLC_SEGMENT_NAME_FOR_UPLOAD_FAILURE, null, _realtimeIndexSegments); + + // Verify segment uploading succeed. + Response response = _webTarget.path(String.format("/segments/%s/%s/upload", TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME), + LLC_SEGMENT_NAME_FOR_UPLOAD_SUCCESS)).request().post(null); + Assert.assertEquals(response.getStatus(), Response.Status.OK.getStatusCode()); + Assert.assertEquals(response.readEntity(String.class), SEGMENT_DOWNLOAD_URL); + + // Verify bad request: table type is offline + response = _webTarget.path(String.format("/segments/%s/%s/upload", TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME), _offlineIndexSegments.get(0).getSegmentName())).request().post(null); + Assert.assertEquals(response.getStatus(), Response.Status.BAD_REQUEST.getStatusCode()); + + // Verify bad request: segment is not low level consumer segment + response = _webTarget.path(String.format("/segments/%s/%s/upload", TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME), _realtimeIndexSegments.get(0).getSegmentName())).request().post(null); + Assert.assertEquals(response.getStatus(), Response.Status.BAD_REQUEST.getStatusCode()); + + // Verify non-existent segment uploading fail with NOT_FOUND status. + response = _webTarget.path(String.format("/segments/%s/%s_dummy/upload", TABLE_NAME, + LLC_SEGMENT_NAME_FOR_UPLOAD_SUCCESS)).request().post(null); + Assert.assertEquals(response.getStatus(), Response.Status.NOT_FOUND.getStatusCode()); + + // Verify fail to upload segment to segment store with internal server error. + response = _webTarget.path(String.format("/segments/%s/%s/upload", TABLE_NAME, + LLC_SEGMENT_NAME_FOR_UPLOAD_FAILURE)).request().post(null); + Assert.assertEquals(response.getStatus(), Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()); + } + + @Test public void testOfflineTableSegmentMetadata() throws Exception { IndexSegment defaultSegment = _offlineIndexSegments.get(0); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org