This is an automated email from the ASF dual-hosted git repository.

tingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 6c6d08e  add uploadLLCSegment endpoint in TableResource (#6653)
6c6d08e is described below

commit 6c6d08ebbc0439a4c1359c95480c6fa767033f20
Author: Chang <33030174+liuchang0...@users.noreply.github.com>
AuthorDate: Wed Mar 31 09:10:13 2021 -0700

    add uploadLLCSegment endpoint in TableResource (#6653)
    
    * add uploadLLCSegment endpoint in TableResource
    
    * add upload to rest path
    
    * use single class import
    
    * update comment; throw exception when upload fails
    
    * add constant for config: segment.store.uri
    
    * add comment for getSegmentUploader
---
 .../apache/pinot/common/utils/CommonConstants.java |  3 +
 .../core/data/manager/InstanceDataManager.java     |  6 ++
 .../manager/config/InstanceDataManagerConfig.java  |  2 +
 .../segment/index/loader/IndexLoadingConfig.java   |  5 +-
 .../pinot/server/api/resources/TablesResource.java | 70 ++++++++++++++++++++++
 .../starter/helix/HelixInstanceDataManager.java    | 10 ++++
 .../helix/HelixInstanceDataManagerConfig.java      |  7 +++
 .../apache/pinot/server/api/BaseResourceTest.java  | 28 ++++++---
 .../pinot/server/api/TablesResourceTest.java       | 32 +++++++++-
 9 files changed, 153 insertions(+), 10 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
index b9beb55..8ed12fe 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
@@ -307,6 +307,9 @@ public class CommonConstants {
     public static final String SERVER_TLS_PREFIX = "pinot.server.tls";
     public static final String SERVER_NETTYTLS_PREFIX = 
"pinot.server.nettytls";
 
+    // The complete config key is pinot.server.instance.segment.store.uri
+    public static final String CONFIG_OF_SEGMENT_STORE_URI = 
"segment.store.uri";
+
     public static class SegmentCompletionProtocol {
       public static final String PREFIX_OF_CONFIG_OF_SEGMENT_UPLOADER = 
"pinot.server.segment.uploader";
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
index a28e069..f43ff63 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
@@ -30,6 +30,7 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.core.data.manager.realtime.SegmentUploader;
 import org.apache.pinot.core.segment.index.metadata.SegmentMetadata;
 import org.apache.pinot.spi.env.PinotConfiguration;
 
@@ -132,4 +133,9 @@ public interface InstanceDataManager {
    * Returns the Helix property store.
    */
   ZkHelixPropertyStore<ZNRecord> getPropertyStore();
+
+  /**
+   * Returns the segment uploader, which uploads a llc segment to the 
destination place and returns the url of uploaded segment file. Servers utilize 
segment uploader to upload llc segment to segment store.
+   */
+  SegmentUploader getSegmentUploader();
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/config/InstanceDataManagerConfig.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/config/InstanceDataManagerConfig.java
index cf2aeb9..c03fb20 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/config/InstanceDataManagerConfig.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/config/InstanceDataManagerConfig.java
@@ -35,6 +35,8 @@ public interface InstanceDataManagerConfig {
 
   String getInstanceBootstrapSegmentDir();
 
+  String getSegmentStoreUri();
+
   ReadMode getReadMode();
 
   String getSegmentFormatVersion();
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
index f3a2644..0436eae 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
@@ -37,13 +37,14 @@ import org.apache.pinot.spi.config.table.IndexingConfig;
 import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 
+import static 
org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_SEGMENT_STORE_URI;
+
 
 /**
  * Table level index loading config.
  */
 public class IndexLoadingConfig {
   private static final int DEFAULT_REALTIME_AVG_MULTI_VALUE_COUNT = 2;
-  private static final String SEGMENT_STORE_URI = "segment.store.uri";
 
   private ReadMode _readMode = ReadMode.DEFAULT_MODE;
   private List<String> _sortedColumns = Collections.emptyList();
@@ -235,7 +236,7 @@ public class IndexLoadingConfig {
       _realtimeAvgMultiValueCount = Integer.valueOf(avgMultiValueCount);
     }
     _enableSplitCommitEndWithMetadata = 
instanceDataManagerConfig.isEnableSplitCommitEndWithMetadata();
-    _segmentStoreURI = 
instanceDataManagerConfig.getConfig().getProperty(SEGMENT_STORE_URI);
+    _segmentStoreURI = 
instanceDataManagerConfig.getConfig().getProperty(CONFIG_OF_SEGMENT_STORE_URI);
   }
 
   /**
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
index e5d533c..174d687 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
@@ -24,6 +24,7 @@ import io.swagger.annotations.ApiParam;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
 import java.io.File;
+import java.net.URI;
 import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -34,6 +35,7 @@ import javax.inject.Inject;
 import javax.ws.rs.DefaultValue;
 import javax.ws.rs.Encoded;
 import javax.ws.rs.GET;
+import javax.ws.rs.POST;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
@@ -49,11 +51,13 @@ import 
org.apache.pinot.common.restlet.resources.SegmentConsumerInfo;
 import org.apache.pinot.common.restlet.resources.ResourceUtils;
 import org.apache.pinot.common.restlet.resources.TableSegments;
 import org.apache.pinot.common.restlet.resources.TablesList;
+import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.data.manager.SegmentDataManager;
 import org.apache.pinot.core.data.manager.TableDataManager;
 import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
+import org.apache.pinot.core.data.manager.realtime.SegmentUploader;
 import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.server.api.access.AccessControl;
 import org.apache.pinot.server.api.access.AccessControlFactory;
@@ -69,6 +73,7 @@ import org.slf4j.LoggerFactory;
 public class TablesResource {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TablesResource.class);
   private static final String PEER_SEGMENT_DOWNLOAD_DIR = 
"peerSegmentDownloadDir";
+  private static final String SEGMENT_UPLOAD_DIR = "segmentUploadDir";
 
   @Inject
   private ServerInstance _serverInstance;
@@ -244,6 +249,71 @@ public class TablesResource {
     }
   }
 
+  /**
+   * Upload a low level consumer segment to segment store and return the 
segment download url. This endpoint is used when segment store copy is 
unavailable for committed low level consumer segments.
+   * Please note that invocation of this endpoint may cause query performance 
to suffer, since we tar up the segment to upload it.
+   * @see <a 
href="https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion#BypassingdeepstorerequirementforRealtimesegmentcompletion-Failurecasesandhandling";>By-passing
 deep-store requirement for Realtime segment completion:Failure cases and 
handling</a>
+   */
+  @POST
+  @Path("/segments/{realtimeTableName}/{segmentName}/upload")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Upload a low level consumer segment to segment store 
and return the segment download url", notes = "Upload a low level consumer 
segment to segment store and return the segment download url")
+  @ApiResponses(value = {@ApiResponse(code = 200, message = "Success"), 
@ApiResponse(code = 500, message = "Internal server error", response = 
ErrorInfo.class), @ApiResponse(code = 404, message = "Table or segment not 
found", response = ErrorInfo.class), @ApiResponse(code = 400, message = "Bad 
request", response = ErrorInfo.class)})
+  public String uploadLLCSegment(
+          @ApiParam(value = "Name of the REALTIME table", required = true) 
@PathParam("realtimeTableName") String realtimeTableName,
+          @ApiParam(value = "Name of the segment", required = true) 
@PathParam("segmentName") String segmentName) throws Exception {
+    LOGGER.info("Received a request to upload low level consumer segment {} 
for table {}", segmentName, realtimeTableName);
+
+    // Check it's realtime table
+    TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(realtimeTableName);
+    if (TableType.OFFLINE == tableType) {
+      throw new WebApplicationException(
+              String.format("Cannot upload low level consumer segment for 
OFFLINE table: %s", realtimeTableName),
+              Response.Status.BAD_REQUEST);
+    }
+
+    // Check the segment is low level consumer segment
+    if (!LLCSegmentName.isLowLevelConsumerSegmentName(segmentName)) {
+      throw new WebApplicationException(
+              String.format("Segment %s is not a low level consumer segment", 
segmentName),
+              Response.Status.BAD_REQUEST);
+    }
+
+    String tableNameWithType = 
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(realtimeTableName);
+    TableDataManager tableDataManager = 
checkGetTableDataManager(tableNameWithType);
+    SegmentDataManager segmentDataManager = 
tableDataManager.acquireSegment(segmentName);
+    if (segmentDataManager == null) {
+      throw new WebApplicationException(
+              String.format("Table %s segment %s does not exist", 
realtimeTableName, segmentName),
+              Response.Status.NOT_FOUND);
+    }
+
+    File segmentTarFile = null;
+    try {
+      // Create the tar.gz segment file in the server's segmentTarUploadDir 
folder with a unique file name.
+      File segmentTarUploadDir =
+              new 
File(_serverInstance.getInstanceDataManager().getSegmentFileDirectory(), 
SEGMENT_UPLOAD_DIR);
+      segmentTarUploadDir.mkdir();
+
+      segmentTarFile = new File(segmentTarUploadDir, tableNameWithType + "_" + 
segmentName + "_" + UUID.randomUUID()
+              + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+      TarGzCompressionUtils.createTarGzFile(new 
File(tableDataManager.getTableDataDir(), segmentName), segmentTarFile);
+
+      // Use segment uploader to upload the segment tar file to segment store 
and return the segment download url.
+      SegmentUploader segmentUploader = 
_serverInstance.getInstanceDataManager().getSegmentUploader();
+      URI segmentDownloadUrl = segmentUploader.uploadSegment(segmentTarFile, 
new LLCSegmentName(segmentName));
+      if (segmentDownloadUrl == null) {
+        throw new WebApplicationException(
+            String.format("Failed to upload table %s segment %s to segment 
store", realtimeTableName, segmentName),
+            Response.Status.INTERNAL_SERVER_ERROR);
+      }
+      return segmentDownloadUrl.getPath();
+    } finally {
+      FileUtils.deleteQuietly(segmentTarFile);
+      tableDataManager.releaseSegment(segmentDataManager);
+    }
+  }
+
   @GET
   @Path("tables/{realtimeTableName}/consumingSegmentsInfo")
   @Produces(MediaType.APPLICATION_JSON)
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index 7849793..ebfa53a 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -42,6 +42,8 @@ import org.apache.pinot.core.data.manager.TableDataManager;
 import org.apache.pinot.core.data.manager.config.TableDataManagerConfig;
 import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider;
 import 
org.apache.pinot.core.data.manager.realtime.SegmentBuildTimeLeaseExtender;
+import org.apache.pinot.core.data.manager.realtime.PinotFSSegmentUploader;
+import org.apache.pinot.core.data.manager.realtime.SegmentUploader;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
 import org.apache.pinot.core.indexsegment.mutable.MutableSegmentImpl;
@@ -72,6 +74,7 @@ public class HelixInstanceDataManager implements 
InstanceDataManager {
   private ServerMetrics _serverMetrics;
   private ZkHelixPropertyStore<ZNRecord> _propertyStore;
   private String _authToken;
+  private SegmentUploader _segmentUploader;
 
   @Override
   public synchronized void init(PinotConfiguration config, HelixManager 
helixManager, ServerMetrics serverMetrics)
@@ -84,6 +87,8 @@ public class HelixInstanceDataManager implements 
InstanceDataManager {
     _helixManager = helixManager;
     _serverMetrics = serverMetrics;
     _authToken = 
config.getProperty(CommonConstants.Server.CONFIG_OF_AUTH_TOKEN);
+    _segmentUploader = new 
PinotFSSegmentUploader(_instanceDataManagerConfig.getSegmentStoreUri(),
+            PinotFSSegmentUploader.DEFAULT_SEGMENT_UPLOAD_TIMEOUT_MILLIS);
 
     File instanceDataDir = new 
File(_instanceDataManagerConfig.getInstanceDataDir());
     if (!instanceDataDir.exists()) {
@@ -355,4 +360,9 @@ public class HelixInstanceDataManager implements 
InstanceDataManager {
   public ZkHelixPropertyStore<ZNRecord> getPropertyStore() {
     return _propertyStore;
   }
+
+  @Override
+  public SegmentUploader getSegmentUploader() {
+    return _segmentUploader;
+  }
 }
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
index 0e4f60f..ac09b77 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
@@ -29,6 +29,8 @@ import org.apache.pinot.spi.env.PinotConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static 
org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_SEGMENT_STORE_URI;
+
 
 /**
  * The config used for HelixInstanceDataManager.
@@ -150,6 +152,11 @@ public class HelixInstanceDataManagerConfig implements 
InstanceDataManagerConfig
   }
 
   @Override
+  public String getSegmentStoreUri() {
+    return 
_instanceDataManagerConfiguration.getProperty(CONFIG_OF_SEGMENT_STORE_URI);
+  }
+
+  @Override
   public ReadMode getReadMode() {
     return 
ReadMode.valueOf(_instanceDataManagerConfiguration.getProperty(READ_MODE));
   }
diff --git 
a/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java 
b/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
index f3af60b..60c04fc 100644
--- 
a/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
+++ 
b/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.server.api;
 
 import java.io.File;
+import java.net.URI;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -30,15 +31,16 @@ import javax.ws.rs.client.WebTarget;
 import org.apache.commons.io.FileUtils;
 import org.apache.helix.HelixManager;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
-import org.apache.http.util.NetUtils;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.segment.ReadMode;
 import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.common.utils.NetUtil;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.data.manager.TableDataManager;
 import org.apache.pinot.core.data.manager.config.TableDataManagerConfig;
 import org.apache.pinot.core.data.manager.offline.OfflineTableDataManager;
+import org.apache.pinot.core.data.manager.realtime.SegmentUploader;
 import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
@@ -55,7 +57,9 @@ import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 
-import static org.mockito.Mockito.anyString;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -64,6 +68,9 @@ public abstract class BaseResourceTest {
   private static final String AVRO_DATA_PATH = "data/test_data-mv.avro";
   private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), 
"BaseResourceTest");
   protected static final String TABLE_NAME = "testTable";
+  protected static final String LLC_SEGMENT_NAME_FOR_UPLOAD_SUCCESS = new 
LLCSegmentName(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME), 1, 0, 
System.currentTimeMillis()).getSegmentName();
+  protected static final String LLC_SEGMENT_NAME_FOR_UPLOAD_FAILURE = new 
LLCSegmentName(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME), 2, 0, 
System.currentTimeMillis()).getSegmentName();
+  protected static final String SEGMENT_DOWNLOAD_URL = 
"testSegmentDownloadUrl";
 
   private final Map<String, TableDataManager> _tableDataManagerMap = new 
HashMap<>();
   protected final List<ImmutableSegment> _realtimeIndexSegments = new 
ArrayList<>();
@@ -93,14 +100,21 @@ public abstract class BaseResourceTest {
     
when(serverInstance.getInstanceDataManager()).thenReturn(instanceDataManager);
     when(serverInstance.getInstanceDataManager().getSegmentFileDirectory())
         .thenReturn(FileUtils.getTempDirectoryPath());
+
+    // Mock the segment uploader
+    SegmentUploader segmentUploader = mock(SegmentUploader.class);
+    when(segmentUploader.uploadSegment(any(File.class), eq(new 
LLCSegmentName(LLC_SEGMENT_NAME_FOR_UPLOAD_SUCCESS)))).thenReturn(new 
URI(SEGMENT_DOWNLOAD_URL));
+    when(segmentUploader.uploadSegment(any(File.class), eq(new 
LLCSegmentName(LLC_SEGMENT_NAME_FOR_UPLOAD_FAILURE)))).thenReturn(null);
+    when(instanceDataManager.getSegmentUploader()).thenReturn(segmentUploader);
+
     // Add the default tables and segments.
     String realtimeTableName = 
TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME);
     String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME);
 
     addTable(realtimeTableName);
     addTable(offlineTableName);
-    setUpSegment(realtimeTableName, "default", _realtimeIndexSegments);
-    setUpSegment(offlineTableName, "default", _offlineIndexSegments);
+    setUpSegment(realtimeTableName, null, "default", _realtimeIndexSegments);
+    setUpSegment(offlineTableName, null, "default", _offlineIndexSegments);
 
     _adminApiApplication = new AdminApiApplication(serverInstance, new 
AllowAllAccessFactory());
     _adminApiApplication.start(Collections.singletonList(new 
ListenerConfig(CommonConstants.HTTP_PROTOCOL, "0.0.0.0",
@@ -127,16 +141,16 @@ public abstract class BaseResourceTest {
       throws Exception {
     List<ImmutableSegment> immutableSegments = new ArrayList<>();
     for (int i = 0; i < numSegments; i++) {
-      immutableSegments.add(setUpSegment(tableNameWithType, 
Integer.toString(_realtimeIndexSegments.size()), segments));
+      immutableSegments.add(setUpSegment(tableNameWithType, null, 
Integer.toString(_realtimeIndexSegments.size()), segments));
     }
     return immutableSegments;
   }
 
-  protected ImmutableSegment setUpSegment(String tableNameWithType, String 
segmentNamePostfix,
-      List<ImmutableSegment> segments)
+  protected ImmutableSegment setUpSegment(String tableNameWithType, String 
segmentName, String segmentNamePostfix, List<ImmutableSegment> segments)
       throws Exception {
     SegmentGeneratorConfig config =
         SegmentTestUtils.getSegmentGeneratorConfigWithoutTimeColumn(_avroFile, 
INDEX_DIR, tableNameWithType);
+    config.setSegmentName(segmentName);
     config.setSegmentNamePostfix(segmentNamePostfix);
     SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
     driver.init(config);
diff --git 
a/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
 
b/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
index 44d58bf..b3863aa 100644
--- 
a/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
+++ 
b/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
@@ -83,7 +83,7 @@ public class TablesResourceTest extends BaseResourceTest {
     Assert.assertEquals(segmentNames.get(0), 
_realtimeIndexSegments.get(0).getSegmentName());
 
     IndexSegment secondSegment =
-        setUpSegment(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME), 
"0", _realtimeIndexSegments);
+        setUpSegment(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME), 
null, "0", _realtimeIndexSegments);
     tableSegments = 
_webTarget.path(segmentsPath).request().get(TableSegments.class);
     Assert.assertNotNull(tableSegments);
     segmentNames = tableSegments.getSegments();
@@ -212,6 +212,36 @@ public class TablesResourceTest extends BaseResourceTest {
   }
 
   @Test
+  public void testUploadSegments() throws Exception {
+    setUpSegment(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME), 
LLC_SEGMENT_NAME_FOR_UPLOAD_SUCCESS, null, _realtimeIndexSegments);
+    setUpSegment(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME), 
LLC_SEGMENT_NAME_FOR_UPLOAD_FAILURE, null, _realtimeIndexSegments);
+
+    // Verify segment uploading succeed.
+    Response response = 
_webTarget.path(String.format("/segments/%s/%s/upload", 
TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME),
+        LLC_SEGMENT_NAME_FOR_UPLOAD_SUCCESS)).request().post(null);
+    Assert.assertEquals(response.getStatus(), 
Response.Status.OK.getStatusCode());
+    Assert.assertEquals(response.readEntity(String.class), 
SEGMENT_DOWNLOAD_URL);
+
+    // Verify bad request: table type is offline
+    response = _webTarget.path(String.format("/segments/%s/%s/upload", 
TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME), 
_offlineIndexSegments.get(0).getSegmentName())).request().post(null);
+    Assert.assertEquals(response.getStatus(), 
Response.Status.BAD_REQUEST.getStatusCode());
+
+    // Verify bad request: segment is not low level consumer segment
+    response = _webTarget.path(String.format("/segments/%s/%s/upload", 
TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME), 
_realtimeIndexSegments.get(0).getSegmentName())).request().post(null);
+    Assert.assertEquals(response.getStatus(), 
Response.Status.BAD_REQUEST.getStatusCode());
+
+    // Verify non-existent segment uploading fail with NOT_FOUND status.
+    response = _webTarget.path(String.format("/segments/%s/%s_dummy/upload", 
TABLE_NAME,
+        LLC_SEGMENT_NAME_FOR_UPLOAD_SUCCESS)).request().post(null);
+    Assert.assertEquals(response.getStatus(), 
Response.Status.NOT_FOUND.getStatusCode());
+
+    // Verify fail to upload segment to segment store with internal server 
error.
+    response = _webTarget.path(String.format("/segments/%s/%s/upload", 
TABLE_NAME,
+        LLC_SEGMENT_NAME_FOR_UPLOAD_FAILURE)).request().post(null);
+    Assert.assertEquals(response.getStatus(), 
Response.Status.INTERNAL_SERVER_ERROR.getStatusCode());
+  }
+
+  @Test
   public void testOfflineTableSegmentMetadata()
       throws Exception {
     IndexSegment defaultSegment = _offlineIndexSegments.get(0);

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to