This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 7d695d8 Make realtime server upload timeout configurable (#4170)
7d695d8 is described below
commit 7d695d82864771eb9077dee089b9d34c0cd768fb
Author: Neha Pawar <[email protected]>
AuthorDate: Fri Apr 26 16:42:24 2019 -0700
Make realtime server upload timeout configurable (#4170)
* Make default upload timeout 5m
* Move config strings and defaults to
CommonConstants.Server.SegmentUploadProtocol
---
.../org/apache/pinot/common/utils/CommonConstants.java | 12 +++++++++++-
.../api/resources/LLCSegmentCompletionHandlers.java | 14 +++++++-------
.../helix/core/realtime/SegmentCompletionManager.java | 4 ++--
.../realtime/ServerSegmentCompletionProtocolHandler.java | 16 ++++++++--------
.../pinot/server/starter/helix/HelixServerStarter.java | 4 ++--
5 files changed, 30 insertions(+), 20 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
index fee6962..7542cc5 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
@@ -214,11 +214,21 @@ public class CommonConstants {
public static final String DEFAULT_REQUEST_HANDLER_FACTORY_CLASS =
"org.apache.pinot.server.request.SimpleRequestHandlerFactory";
public static final String PREFIX_OF_CONFIG_OF_SEGMENT_FETCHER_FACTORY =
"pinot.server.segment.fetcher";
- public static final String PREFIX_OF_CONFIG_OF_SEGMENT_UPLOADER =
"pinot.server.segment.uploader";
public static final String DEFAULT_STAR_TREE_FORMAT_VERSION = "OFF_HEAP";
public static final String DEFAULT_COLUMN_MIN_MAX_VALUE_GENERATOR_MODE =
"TIME";
public static final long DEFAULT_MAX_SHUTDOWN_WAIT_TIME_MS = 600_000L;
public static final long DEFAULT_CHECK_INTERVAL_TIME_MS = 60_000L;
+
+ public static class SegmentCompletionProtocol {
+ public static final String PREFIX_OF_CONFIG_OF_SEGMENT_UPLOADER =
"pinot.server.segment.uploader";
+
+ public static final String CONFIG_OF_CONTROLLER_HTTPS_ENABLED =
"enabled";
+ public static final String CONFIG_OF_CONTROLLER_HTTPS_PORT =
"controller.port";
+ public static final String CONFIG_OF_SEGMENT_UPLOAD_REQUEST_TIMEOUT_MS =
"upload.request.timeout.ms";
+
+ public static final int DEFAULT_SEGMENT_UPLOAD_REQUEST_TIMEOUT_MS =
300_000;
+ public static final int DEFAULT_OTHER_REQUESTS_TIMEOUT = 10_000;
+ }
}
public static class Controller {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
index bbd70b3..84df735 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
@@ -135,7 +135,7 @@ public class LLCSegmentCompletionHandlers {
SegmentCompletionProtocol.Response response =
_segmentCompletionManager.segmentConsumed(requestParams);
final String responseStr = response.toJsonString();
- LOGGER.info("Response to segmentConsumed:{}", responseStr);
+ LOGGER.info("Response to segmentConsumed for segment:{} is :{}",
segmentName, responseStr);
return responseStr;
}
@@ -158,7 +158,7 @@ public class LLCSegmentCompletionHandlers {
SegmentCompletionProtocol.Response response =
_segmentCompletionManager.segmentStoppedConsuming(requestParams);
final String responseStr = response.toJsonString();
- LOGGER.info("Response to segmentStoppedConsuming:{}", responseStr);
+ LOGGER.info("Response to segmentStoppedConsuming for segment:{} is:{}",
segmentName, responseStr);
return responseStr;
}
@@ -188,7 +188,7 @@ public class LLCSegmentCompletionHandlers {
SegmentCompletionProtocol.Response response =
_segmentCompletionManager.segmentCommitStart(requestParams);
final String responseStr = response.toJsonString();
- LOGGER.info("Response to segmentCommitStart:{}", responseStr);
+ LOGGER.info("Response to segmentCommitStart for segment:{} is:{}",
segmentName, responseStr);
return responseStr;
}
@@ -236,7 +236,7 @@ public class LLCSegmentCompletionHandlers {
SegmentCompletionProtocol.Response response = _segmentCompletionManager
.segmentCommitEnd(requestParams, isSuccess, isSplitCommit,
committingSegmentDescriptor);
final String responseStr = response.toJsonString();
- LOGGER.info("Response to segmentCommitEnd:{}", responseStr);
+ LOGGER.info("Response to segmentCommitEnd for segment:{} is:{}",
segmentName, responseStr);
return responseStr;
}
@@ -362,7 +362,7 @@ public class LLCSegmentCompletionHandlers {
String response = new
SegmentCompletionProtocol.Response(responseParams).toJsonString();
- LOGGER.info("Response to segmentUpload:{}", response);
+ LOGGER.info("Response to segmentUpload for segment:{} is:{}",
segmentName, response);
return response;
} catch (Exception e) {
@@ -399,7 +399,7 @@ public class LLCSegmentCompletionHandlers {
.withSegmentLocation(segmentLocation).withSegmentSizeBytes(segmentSizeBytes)
.withBuildTimeMillis(buildTimeMillis).withWaitTimeMillis(waitTimeMillis).withNumRows(numRows)
.withMemoryUsedBytes(memoryUsedBytes);
- LOGGER.info("Processing segmentCommitEnd:{}", requestParams.toString());
+ LOGGER.info("Processing segmentCommitEndWithMetadata:{}",
requestParams.toString());
final boolean isSuccess = true;
final boolean isSplitCommit = true;
@@ -413,7 +413,7 @@ public class LLCSegmentCompletionHandlers {
.segmentCommitEnd(requestParams, isSuccess, isSplitCommit,
CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(requestParams,
segmentMetadata));
final String responseStr = response.toJsonString();
- LOGGER.info("Response to segmentCommitEnd:{}", responseStr);
+ LOGGER.info("Response to segmentCommitEndWithMetadata for segment:{}
is:{}", segmentName, responseStr);
return responseStr;
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
index 2613570..a4da6b4 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
@@ -511,7 +511,7 @@ public class SegmentCompletionManager {
return SegmentCompletionProtocol.RESP_FAILED;
}
synchronized (this) {
- LOGGER.info("Processing segmentCommit({}, {})", instanceId, offset);
+ LOGGER.info("Processing segmentCommitStart({}, {})", instanceId,
offset);
switch (_state) {
case PARTIAL_CONSUMING:
return PARTIAL_CONSUMING__commit(instanceId, offset, now);
@@ -614,7 +614,7 @@ public class SegmentCompletionManager {
LOGGER.warn("Not accepting commitEnd from {} since it had stoppd
consuming", instanceId);
return abortAndReturnFailed();
}
- LOGGER.info("Processing segmentCommit({}, {})", instanceId, offset);
+ LOGGER.info("Processing segmentCommitEnd({}, {})", instanceId, offset);
if (!_state.equals(State.COMMITTER_UPLOADING) ||
!instanceId.equals(_winner) || offset != _winningOffset) {
// State changed while we were out of sync. return a failed commit.
LOGGER.warn("State change during upload: state={} segment={}
winner={} winningOffset={}", _state,
diff --git
a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java
b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java
index 2261c55..bcfa7ee 100644
---
a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java
+++
b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java
@@ -33,6 +33,8 @@ import org.apache.pinot.core.query.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static
org.apache.pinot.common.utils.CommonConstants.Server.SegmentCompletionProtocol.*;
+
/**
* A class that handles sending segment completion protocol requests to the
controller and getting
@@ -40,16 +42,12 @@ import org.slf4j.LoggerFactory;
*/
public class ServerSegmentCompletionProtocolHandler {
private static Logger LOGGER =
LoggerFactory.getLogger(ServerSegmentCompletionProtocolHandler.class);
- private static final int SEGMENT_UPLOAD_REQUEST_TIMEOUT_MS = 30_000;
- private static final int OTHER_REQUESTS_TIMEOUT = 10_000;
private static final String HTTPS_PROTOCOL = "https";
private static final String HTTP_PROTOCOL = "http";
- private static final String CONFIG_OF_CONTROLLER_HTTPS_ENABLED = "enabled";
- private static final String CONFIG_OF_CONTROLLER_HTTPS_PORT =
"controller.port";
-
private static SSLContext _sslContext;
private static Integer _controllerHttpsPort;
+ private static int _segmentUploadRequestTimeoutMs;
private final FileUploadDownloadClient _fileUploadDownloadClient;
private final ServerMetrics _serverMetrics;
@@ -60,6 +58,8 @@ public class ServerSegmentCompletionProtocolHandler {
_sslContext = new
ClientSSLContextGenerator(httpsConfig.subset(CommonConstants.PREFIX_OF_SSL_SUBSET)).generate();
_controllerHttpsPort =
httpsConfig.getInt(CONFIG_OF_CONTROLLER_HTTPS_PORT);
}
+ _segmentUploadRequestTimeoutMs =
+ uploaderConfig.getInt(CONFIG_OF_SEGMENT_UPLOAD_REQUEST_TIMEOUT_MS,
DEFAULT_SEGMENT_UPLOAD_REQUEST_TIMEOUT_MS);
}
public ServerSegmentCompletionProtocolHandler(ServerMetrics serverMetrics) {
@@ -177,7 +177,7 @@ public class ServerSegmentCompletionProtocolHandler {
SegmentCompletionProtocol.Response response;
try {
String responseStr =
- _fileUploadDownloadClient.sendSegmentCompletionProtocolRequest(new
URI(url), OTHER_REQUESTS_TIMEOUT)
+ _fileUploadDownloadClient.sendSegmentCompletionProtocolRequest(new
URI(url), DEFAULT_OTHER_REQUESTS_TIMEOUT)
.getResponse();
response =
SegmentCompletionProtocol.Response.fromJsonString(responseStr);
LOGGER.info("Controller response {} for {}", response.toJsonString(),
url);
@@ -201,7 +201,7 @@ public class ServerSegmentCompletionProtocolHandler {
SegmentCompletionProtocol.Response response;
try {
String responseStr = _fileUploadDownloadClient
- .uploadSegmentMetadataFiles(new URI(url), metadataFiles,
SEGMENT_UPLOAD_REQUEST_TIMEOUT_MS).getResponse();
+ .uploadSegmentMetadataFiles(new URI(url), metadataFiles,
_segmentUploadRequestTimeoutMs).getResponse();
response =
SegmentCompletionProtocol.Response.fromJsonString(responseStr);
LOGGER.info("Controller response {} for {}", response.toJsonString(),
url);
if
(response.getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.NOT_LEADER))
{
@@ -224,7 +224,7 @@ public class ServerSegmentCompletionProtocolHandler {
SegmentCompletionProtocol.Response response;
try {
String responseStr = _fileUploadDownloadClient
- .uploadSegment(new URI(url), segmentName, segmentTarFile, null,
null, SEGMENT_UPLOAD_REQUEST_TIMEOUT_MS)
+ .uploadSegment(new URI(url), segmentName, segmentTarFile, null,
null, _segmentUploadRequestTimeoutMs)
.getResponse();
response =
SegmentCompletionProtocol.Response.fromJsonString(responseStr);
LOGGER.info("Controller response {} for {}", response.toJsonString(),
url);
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
index 5d3ed26..a026593 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
@@ -145,8 +145,8 @@ public class HelixServerStarter {
Utils.logVersions();
ServerConf serverInstanceConfig =
DefaultHelixStarterServerConfig.getDefaultHelixServerConfig(_helixServerConfig);
// Need to do this before we start receiving state transitions.
- ServerSegmentCompletionProtocolHandler
-
.init(_helixServerConfig.subset(CommonConstants.Server.PREFIX_OF_CONFIG_OF_SEGMENT_UPLOADER));
+ ServerSegmentCompletionProtocolHandler.init(_helixServerConfig.subset(
+
CommonConstants.Server.SegmentCompletionProtocol.PREFIX_OF_CONFIG_OF_SEGMENT_UPLOADER));
_serverInstance = new ServerInstance();
_serverInstance.init(serverInstanceConfig, propertyStore);
_serverInstance.start();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]