This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d695d8  Make realtime server upload timeout configurable (#4170)
7d695d8 is described below

commit 7d695d82864771eb9077dee089b9d34c0cd768fb
Author: Neha Pawar <[email protected]>
AuthorDate: Fri Apr 26 16:42:24 2019 -0700

    Make realtime server upload timeout configurable (#4170)
    
    * Make default upload timeout 5m
    
    * Move config strings and defaults to 
CommonConstants.Server.SegmentUploadProtocol
---
 .../org/apache/pinot/common/utils/CommonConstants.java   | 12 +++++++++++-
 .../api/resources/LLCSegmentCompletionHandlers.java      | 14 +++++++-------
 .../helix/core/realtime/SegmentCompletionManager.java    |  4 ++--
 .../realtime/ServerSegmentCompletionProtocolHandler.java | 16 ++++++++--------
 .../pinot/server/starter/helix/HelixServerStarter.java   |  4 ++--
 5 files changed, 30 insertions(+), 20 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
index fee6962..7542cc5 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
@@ -214,11 +214,21 @@ public class CommonConstants {
     public static final String DEFAULT_REQUEST_HANDLER_FACTORY_CLASS =
         "org.apache.pinot.server.request.SimpleRequestHandlerFactory";
     public static final String PREFIX_OF_CONFIG_OF_SEGMENT_FETCHER_FACTORY = 
"pinot.server.segment.fetcher";
-    public static final String PREFIX_OF_CONFIG_OF_SEGMENT_UPLOADER = 
"pinot.server.segment.uploader";
     public static final String DEFAULT_STAR_TREE_FORMAT_VERSION = "OFF_HEAP";
     public static final String DEFAULT_COLUMN_MIN_MAX_VALUE_GENERATOR_MODE = 
"TIME";
     public static final long DEFAULT_MAX_SHUTDOWN_WAIT_TIME_MS = 600_000L;
     public static final long DEFAULT_CHECK_INTERVAL_TIME_MS = 60_000L;
+
+    public static class SegmentCompletionProtocol {
+      public static final String PREFIX_OF_CONFIG_OF_SEGMENT_UPLOADER = 
"pinot.server.segment.uploader";
+
+      public static final String CONFIG_OF_CONTROLLER_HTTPS_ENABLED = 
"enabled";
+      public static final String CONFIG_OF_CONTROLLER_HTTPS_PORT = 
"controller.port";
+      public static final String CONFIG_OF_SEGMENT_UPLOAD_REQUEST_TIMEOUT_MS = 
"upload.request.timeout.ms";
+
+      public static final int DEFAULT_SEGMENT_UPLOAD_REQUEST_TIMEOUT_MS = 
300_000;
+      public static final int DEFAULT_OTHER_REQUESTS_TIMEOUT = 10_000;
+    }
   }
 
   public static class Controller {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
index bbd70b3..84df735 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
@@ -135,7 +135,7 @@ public class LLCSegmentCompletionHandlers {
 
     SegmentCompletionProtocol.Response response = 
_segmentCompletionManager.segmentConsumed(requestParams);
     final String responseStr = response.toJsonString();
-    LOGGER.info("Response to segmentConsumed:{}", responseStr);
+    LOGGER.info("Response to segmentConsumed for segment:{} is :{}", 
segmentName, responseStr);
     return responseStr;
   }
 
@@ -158,7 +158,7 @@ public class LLCSegmentCompletionHandlers {
     SegmentCompletionProtocol.Response response =
         _segmentCompletionManager.segmentStoppedConsuming(requestParams);
     final String responseStr = response.toJsonString();
-    LOGGER.info("Response to segmentStoppedConsuming:{}", responseStr);
+    LOGGER.info("Response to segmentStoppedConsuming for segment:{} is:{}", 
segmentName, responseStr);
     return responseStr;
   }
 
@@ -188,7 +188,7 @@ public class LLCSegmentCompletionHandlers {
     SegmentCompletionProtocol.Response response =
         _segmentCompletionManager.segmentCommitStart(requestParams);
     final String responseStr = response.toJsonString();
-    LOGGER.info("Response to segmentCommitStart:{}", responseStr);
+    LOGGER.info("Response to segmentCommitStart for segment:{} is:{}", 
segmentName, responseStr);
     return responseStr;
   }
 
@@ -236,7 +236,7 @@ public class LLCSegmentCompletionHandlers {
     SegmentCompletionProtocol.Response response = _segmentCompletionManager
         .segmentCommitEnd(requestParams, isSuccess, isSplitCommit, 
committingSegmentDescriptor);
     final String responseStr = response.toJsonString();
-    LOGGER.info("Response to segmentCommitEnd:{}", responseStr);
+    LOGGER.info("Response to segmentCommitEnd for segment:{} is:{}", 
segmentName, responseStr);
     return responseStr;
   }
 
@@ -362,7 +362,7 @@ public class LLCSegmentCompletionHandlers {
 
       String response = new 
SegmentCompletionProtocol.Response(responseParams).toJsonString();
 
-      LOGGER.info("Response to segmentUpload:{}", response);
+      LOGGER.info("Response to segmentUpload for segment:{} is:{}", 
segmentName, response);
 
       return response;
     } catch (Exception e) {
@@ -399,7 +399,7 @@ public class LLCSegmentCompletionHandlers {
         
.withSegmentLocation(segmentLocation).withSegmentSizeBytes(segmentSizeBytes)
         
.withBuildTimeMillis(buildTimeMillis).withWaitTimeMillis(waitTimeMillis).withNumRows(numRows)
         .withMemoryUsedBytes(memoryUsedBytes);
-    LOGGER.info("Processing segmentCommitEnd:{}", requestParams.toString());
+    LOGGER.info("Processing segmentCommitEndWithMetadata:{}", 
requestParams.toString());
 
     final boolean isSuccess = true;
     final boolean isSplitCommit = true;
@@ -413,7 +413,7 @@ public class LLCSegmentCompletionHandlers {
         .segmentCommitEnd(requestParams, isSuccess, isSplitCommit,
             
CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(requestParams,
 segmentMetadata));
     final String responseStr = response.toJsonString();
-    LOGGER.info("Response to segmentCommitEnd:{}", responseStr);
+    LOGGER.info("Response to segmentCommitEndWithMetadata for segment:{} 
is:{}", segmentName, responseStr);
     return responseStr;
   }
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
index 2613570..a4da6b4 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
@@ -511,7 +511,7 @@ public class SegmentCompletionManager {
         return SegmentCompletionProtocol.RESP_FAILED;
       }
       synchronized (this) {
-        LOGGER.info("Processing segmentCommit({}, {})", instanceId, offset);
+        LOGGER.info("Processing segmentCommitStart({}, {})", instanceId, 
offset);
         switch (_state) {
           case PARTIAL_CONSUMING:
             return PARTIAL_CONSUMING__commit(instanceId, offset, now);
@@ -614,7 +614,7 @@ public class SegmentCompletionManager {
           LOGGER.warn("Not accepting commitEnd from {} since it had stoppd 
consuming", instanceId);
           return abortAndReturnFailed();
         }
-        LOGGER.info("Processing segmentCommit({}, {})", instanceId, offset);
+        LOGGER.info("Processing segmentCommitEnd({}, {})", instanceId, offset);
         if (!_state.equals(State.COMMITTER_UPLOADING) || 
!instanceId.equals(_winner) || offset != _winningOffset) {
           // State changed while we were out of sync. return a failed commit.
           LOGGER.warn("State change during upload: state={} segment={} 
winner={} winningOffset={}", _state,
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java
 
b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java
index 2261c55..bcfa7ee 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java
@@ -33,6 +33,8 @@ import org.apache.pinot.core.query.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static 
org.apache.pinot.common.utils.CommonConstants.Server.SegmentCompletionProtocol.*;
+
 
 /**
  * A class that handles sending segment completion protocol requests to the 
controller and getting
@@ -40,16 +42,12 @@ import org.slf4j.LoggerFactory;
  */
 public class ServerSegmentCompletionProtocolHandler {
   private static Logger LOGGER = 
LoggerFactory.getLogger(ServerSegmentCompletionProtocolHandler.class);
-  private static final int SEGMENT_UPLOAD_REQUEST_TIMEOUT_MS = 30_000;
-  private static final int OTHER_REQUESTS_TIMEOUT = 10_000;
   private static final String HTTPS_PROTOCOL = "https";
   private static final String HTTP_PROTOCOL = "http";
 
-  private static final String CONFIG_OF_CONTROLLER_HTTPS_ENABLED = "enabled";
-  private static final String CONFIG_OF_CONTROLLER_HTTPS_PORT = 
"controller.port";
-
   private static SSLContext _sslContext;
   private static Integer _controllerHttpsPort;
+  private static int _segmentUploadRequestTimeoutMs;
 
   private final FileUploadDownloadClient _fileUploadDownloadClient;
   private final ServerMetrics _serverMetrics;
@@ -60,6 +58,8 @@ public class ServerSegmentCompletionProtocolHandler {
       _sslContext = new 
ClientSSLContextGenerator(httpsConfig.subset(CommonConstants.PREFIX_OF_SSL_SUBSET)).generate();
       _controllerHttpsPort = 
httpsConfig.getInt(CONFIG_OF_CONTROLLER_HTTPS_PORT);
     }
+    _segmentUploadRequestTimeoutMs =
+        uploaderConfig.getInt(CONFIG_OF_SEGMENT_UPLOAD_REQUEST_TIMEOUT_MS, 
DEFAULT_SEGMENT_UPLOAD_REQUEST_TIMEOUT_MS);
   }
 
   public ServerSegmentCompletionProtocolHandler(ServerMetrics serverMetrics) {
@@ -177,7 +177,7 @@ public class ServerSegmentCompletionProtocolHandler {
     SegmentCompletionProtocol.Response response;
     try {
       String responseStr =
-          _fileUploadDownloadClient.sendSegmentCompletionProtocolRequest(new 
URI(url), OTHER_REQUESTS_TIMEOUT)
+          _fileUploadDownloadClient.sendSegmentCompletionProtocolRequest(new 
URI(url), DEFAULT_OTHER_REQUESTS_TIMEOUT)
               .getResponse();
       response = 
SegmentCompletionProtocol.Response.fromJsonString(responseStr);
       LOGGER.info("Controller response {} for {}", response.toJsonString(), 
url);
@@ -201,7 +201,7 @@ public class ServerSegmentCompletionProtocolHandler {
     SegmentCompletionProtocol.Response response;
     try {
       String responseStr = _fileUploadDownloadClient
-          .uploadSegmentMetadataFiles(new URI(url), metadataFiles, 
SEGMENT_UPLOAD_REQUEST_TIMEOUT_MS).getResponse();
+          .uploadSegmentMetadataFiles(new URI(url), metadataFiles, 
_segmentUploadRequestTimeoutMs).getResponse();
       response = 
SegmentCompletionProtocol.Response.fromJsonString(responseStr);
       LOGGER.info("Controller response {} for {}", response.toJsonString(), 
url);
       if 
(response.getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.NOT_LEADER))
 {
@@ -224,7 +224,7 @@ public class ServerSegmentCompletionProtocolHandler {
     SegmentCompletionProtocol.Response response;
     try {
       String responseStr = _fileUploadDownloadClient
-          .uploadSegment(new URI(url), segmentName, segmentTarFile, null, 
null, SEGMENT_UPLOAD_REQUEST_TIMEOUT_MS)
+          .uploadSegment(new URI(url), segmentName, segmentTarFile, null, 
null, _segmentUploadRequestTimeoutMs)
           .getResponse();
       response = 
SegmentCompletionProtocol.Response.fromJsonString(responseStr);
       LOGGER.info("Controller response {} for {}", response.toJsonString(), 
url);
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
index 5d3ed26..a026593 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
@@ -145,8 +145,8 @@ public class HelixServerStarter {
     Utils.logVersions();
     ServerConf serverInstanceConfig = 
DefaultHelixStarterServerConfig.getDefaultHelixServerConfig(_helixServerConfig);
     // Need to do this before we start receiving state transitions.
-    ServerSegmentCompletionProtocolHandler
-        
.init(_helixServerConfig.subset(CommonConstants.Server.PREFIX_OF_CONFIG_OF_SEGMENT_UPLOADER));
+    ServerSegmentCompletionProtocolHandler.init(_helixServerConfig.subset(
+        
CommonConstants.Server.SegmentCompletionProtocol.PREFIX_OF_CONFIG_OF_SEGMENT_UPLOADER));
     _serverInstance = new ServerInstance();
     _serverInstance.init(serverInstanceConfig, propertyStore);
     _serverInstance.start();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to