This is an automated email from the ASF dual-hosted git repository.
tingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 1763e70f79 Minor Realtime Segment Commit Upload Improvements (#10725)
1763e70f79 is described below
commit 1763e70f79ed1328e3e33ae3817557f927d5fa38
Author: Ankit Sultana <[email protected]>
AuthorDate: Thu May 11 00:17:04 2023 +0530
Minor Realtime Segment Commit Upload Improvements (#10725)
* Improve Monitoring for Realtime Segment Commit + Configurable Segment
Upload Timeout
* Add metric for upload timeout
* Minor refactor
* Add metrics for success/failure
* Set timeout in HelixInstanceDataManager as well
---
.../pinot/common/metrics/ControllerMeter.java | 1 +
.../apache/pinot/common/metrics/ServerMeter.java | 4 ++++
.../apache/pinot/common/metrics/ServerTimer.java | 2 ++
.../realtime/PinotLLCRealtimeSegmentManager.java | 5 +++++
.../manager/realtime/PinotFSSegmentUploader.java | 22 +++++++++++++++++++++-
.../manager/realtime/SegmentCommitterFactory.java | 4 ++--
.../realtime/Server2ControllerSegmentUploader.java | 18 ++++++++++++++++--
.../ServerSegmentCompletionProtocolHandler.java | 2 +-
.../realtime/PinotFSSegmentUploaderTest.java | 11 +++++++----
.../Server2ControllerSegmentUploaderTest.java | 4 ++--
.../starter/helix/HelixInstanceDataManager.java | 3 ++-
11 files changed, 63 insertions(+), 13 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
index edd70ee642..758be1550a 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
@@ -57,6 +57,7 @@ public enum ControllerMeter implements AbstractMetrics.Meter {
CRON_SCHEDULER_JOB_TRIGGERED("cronSchedulerJobTriggered", false),
CRON_SCHEDULER_JOB_SKIPPED("cronSchedulerJobSkipped", false),
LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_ERROR("LLCSegmentDeepStoreUploadRetryError",
false),
+ SEGMENT_MISSING_DEEP_STORE_LINK("RealtimeSegmentMissingDeepStoreLink",
false),
NUMBER_ADHOC_TASKS_SUBMITTED("adhocTasks", false);
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index ff292a5092..435ec53566 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -74,6 +74,10 @@ public enum ServerMeter implements AbstractMetrics.Meter {
SEGMENT_DOWNLOAD_FAILURES("segments", false),
SEGMENT_DOWNLOAD_FROM_REMOTE_FAILURES("segments", false),
SEGMENT_DOWNLOAD_FROM_PEERS_FAILURES("segments", false),
+ SEGMENT_UPLOAD_FAILURE("segments", false),
+ SEGMENT_UPLOAD_SUCCESS("segments", false),
+ // Emitted only by Server to Deep-store segment uploader.
+ SEGMENT_UPLOAD_TIMEOUT("segments", false),
NUM_RESIZES("numResizes", false),
NO_TABLE_ACCESS("tables", true),
INDEXING_FAILURES("attributeValues", true),
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
index 5631aeda8e..aa0952730b 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
@@ -44,6 +44,8 @@ public enum ServerTimer implements AbstractMetrics.Timer {
RESPONSE_SER_CPU_TIME_NS("nanoseconds", false, "Query cost (response
serialization cpu time) "
+ "for query processing on server. Computed as the time spent in
serializing query response on servers"),
+ SEGMENT_UPLOAD_TIME_MS("milliseconds", false),
+
TOTAL_CPU_TIME_NS("nanoseconds", false, "Total query cost (thread cpu time +
system "
+ "activities cpu time + response serialization cpu time) for query
processing on server.");
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index e17dc8d46d..455ff15894 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -36,6 +36,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
+import org.apache.commons.lang.StringUtils;
import org.apache.helix.AccessOption;
import org.apache.helix.Criteria;
import org.apache.helix.HelixAdmin;
@@ -606,6 +607,10 @@ public class PinotLLCRealtimeSegmentManager {
// Trigger the metadata event notifier
_metadataEventNotifierFactory.create().notifyOnSegmentFlush(tableConfig);
+
+ if (StringUtils.isBlank(committingSegmentDescriptor.getSegmentLocation()))
{
+ _controllerMetrics.addMeteredTableValue(realtimeTableName,
ControllerMeter.SEGMENT_MISSING_DEEP_STORE_LINK, 1);
+ }
}
/**
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java
index 2325a44e19..9a34872a50 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java
@@ -26,10 +26,15 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.metrics.ServerTimer;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.utils.StringUtil;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,10 +51,12 @@ public class PinotFSSegmentUploader implements
SegmentUploader {
private final String _segmentStoreUriStr;
private final ExecutorService _executorService =
Executors.newCachedThreadPool();
private final int _timeoutInMs;
+ private final ServerMetrics _serverMetrics;
- public PinotFSSegmentUploader(String segmentStoreDirUri, int timeoutMillis) {
+ public PinotFSSegmentUploader(String segmentStoreDirUri, int timeoutMillis,
ServerMetrics serverMetrics) {
_segmentStoreUriStr = segmentStoreDirUri;
_timeoutInMs = timeoutMillis;
+ _serverMetrics = serverMetrics;
}
public URI uploadSegment(File segmentFile, LLCSegmentName segmentName) {
@@ -58,9 +65,11 @@ public class PinotFSSegmentUploader implements
SegmentUploader {
segmentName.getSegmentName());
return null;
}
+ final String rawTableName =
TableNameBuilder.extractRawTableName(segmentName.getTableName());
Callable<URI> uploadTask = () -> {
URI destUri = new URI(StringUtil.join(File.separator,
_segmentStoreUriStr, segmentName.getTableName(),
segmentName.getSegmentName() + UUID.randomUUID().toString()));
+ long startTime = System.currentTimeMillis();
try {
PinotFS pinotFS = PinotFSFactory.create(new
URI(_segmentStoreUriStr).getScheme());
// Check and delete any existing segment file.
@@ -71,6 +80,10 @@ public class PinotFSSegmentUploader implements
SegmentUploader {
return destUri;
} catch (Exception e) {
LOGGER.warn("Failed copy segment tar file {} to segment store {}: {}",
segmentFile.getName(), destUri, e);
+ } finally {
+ long duration = System.currentTimeMillis() - startTime;
+ _serverMetrics.addTimedTableValue(rawTableName,
ServerTimer.SEGMENT_UPLOAD_TIME_MS, duration,
+ TimeUnit.MILLISECONDS);
}
return null;
};
@@ -78,14 +91,21 @@ public class PinotFSSegmentUploader implements
SegmentUploader {
try {
URI segmentLocation = future.get(_timeoutInMs, TimeUnit.MILLISECONDS);
LOGGER.info("Successfully upload segment {} to {}.", segmentName,
segmentLocation);
+ _serverMetrics.addMeteredTableValue(rawTableName,
+ segmentLocation == null ? ServerMeter.SEGMENT_UPLOAD_FAILURE :
ServerMeter.SEGMENT_UPLOAD_SUCCESS, 1);
return segmentLocation;
} catch (InterruptedException e) {
LOGGER.info("Interrupted while waiting for segment upload of {} to {}.",
segmentName, _segmentStoreUriStr);
Thread.currentThread().interrupt();
+ } catch (TimeoutException e) {
+ // Emit a separate metric for timeout since this is relatively more
common than other errors.
+ _serverMetrics.addMeteredTableValue(rawTableName,
ServerMeter.SEGMENT_UPLOAD_TIMEOUT, 1);
+ LOGGER.warn("Timed out waiting to upload segment: {} for table: {}",
segmentName.getSegmentName(), rawTableName);
} catch (Exception e) {
LOGGER
.warn("Failed to upload file {} of segment {} for table {} ",
segmentFile.getAbsolutePath(), segmentName, e);
}
+ _serverMetrics.addMeteredTableValue(rawTableName,
ServerMeter.SEGMENT_UPLOAD_FAILURE, 1);
return null;
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
index 37ff2e5a82..297f30482b 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
@@ -74,13 +74,13 @@ public class SegmentCommitterFactory {
if (uploadToFs || peerSegmentDownloadScheme != null) {
// TODO: peer scheme non-null check exists for backwards compatibility.
remove check once users have migrated
segmentUploader = new PinotFSSegmentUploader(segmentStoreUri,
- PinotFSSegmentUploader.DEFAULT_SEGMENT_UPLOAD_TIMEOUT_MILLIS);
+
ServerSegmentCompletionProtocolHandler.getSegmentUploadRequestTimeoutMs(),
_serverMetrics);
} else {
segmentUploader = new Server2ControllerSegmentUploader(_logger,
_protocolHandler.getFileUploadDownloadClient(),
_protocolHandler.getSegmentCommitUploadURL(params,
controllerVipUrl), params.getSegmentName(),
ServerSegmentCompletionProtocolHandler.getSegmentUploadRequestTimeoutMs(),
_serverMetrics,
- _protocolHandler.getAuthProvider());
+ _protocolHandler.getAuthProvider(), _tableConfig.getTableName());
}
return new SplitSegmentCommitter(_logger, _protocolHandler, params,
segmentUploader, peerSegmentDownloadScheme);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploader.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploader.java
index 2559616b49..5aa5b8b266 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploader.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploader.java
@@ -21,14 +21,18 @@ package org.apache.pinot.core.data.manager.realtime;
import java.io.File;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.auth.AuthProviderUtils;
+import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.metrics.ServerTimer;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.core.util.SegmentCompletionProtocolUtils;
import org.apache.pinot.server.realtime.ControllerLeaderLocator;
import org.apache.pinot.spi.auth.AuthProvider;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
@@ -42,10 +46,11 @@ public class Server2ControllerSegmentUploader implements
SegmentUploader {
private final int _segmentUploadRequestTimeoutMs;
private final ServerMetrics _serverMetrics;
private final AuthProvider _authProvider;
+ private final String _rawTableName;
public Server2ControllerSegmentUploader(Logger segmentLogger,
FileUploadDownloadClient fileUploadDownloadClient,
String controllerSegmentUploadCommitUrl, String segmentName, int
segmentUploadRequestTimeoutMs,
- ServerMetrics serverMetrics, AuthProvider authProvider)
+ ServerMetrics serverMetrics, AuthProvider authProvider, String tableName)
throws URISyntaxException {
_segmentLogger = segmentLogger;
_fileUploadDownloadClient = fileUploadDownloadClient;
@@ -54,6 +59,7 @@ public class Server2ControllerSegmentUploader implements
SegmentUploader {
_segmentUploadRequestTimeoutMs = segmentUploadRequestTimeoutMs;
_serverMetrics = serverMetrics;
_authProvider = authProvider;
+ _rawTableName = TableNameBuilder.extractRawTableName(tableName);
}
@Override
@@ -61,16 +67,20 @@ public class Server2ControllerSegmentUploader implements
SegmentUploader {
SegmentCompletionProtocol.Response response =
uploadSegmentToController(segmentFile);
if (response.getStatus() ==
SegmentCompletionProtocol.ControllerResponseStatus.UPLOAD_SUCCESS) {
try {
- return new URI(response.getSegmentLocation());
+ URI uri = new URI(response.getSegmentLocation());
+ _serverMetrics.addMeteredTableValue(_rawTableName,
ServerMeter.SEGMENT_UPLOAD_SUCCESS, 1);
+ return uri;
} catch (URISyntaxException e) {
_segmentLogger.error("Error in segment location format: ", e);
}
}
+ _serverMetrics.addMeteredTableValue(_rawTableName,
ServerMeter.SEGMENT_UPLOAD_FAILURE, 1);
return null;
}
public SegmentCompletionProtocol.Response uploadSegmentToController(File
segmentFile) {
SegmentCompletionProtocol.Response response;
+ long startTime = System.currentTimeMillis();
try {
String responseStr = _fileUploadDownloadClient
.uploadSegment(_controllerSegmentUploadCommitUrl, _segmentName,
segmentFile,
@@ -88,6 +98,10 @@ public class Server2ControllerSegmentUploader implements
SegmentUploader {
// hence unable to send {@link
SegmentCompletionProtocol.ControllerResponseStatus.NOT_LEADER}
// If cache is not invalidated, we will not recover from exceptions
until the controller comes back up
ControllerLeaderLocator.getInstance().invalidateCachedControllerLeader();
+ } finally {
+ long duration = System.currentTimeMillis() - startTime;
+ _serverMetrics.addTimedTableValue(_rawTableName,
ServerTimer.SEGMENT_UPLOAD_TIME_MS, duration,
+ TimeUnit.MILLISECONDS);
}
SegmentCompletionProtocolUtils.raiseSegmentCompletionProtocolResponseMetric(_serverMetrics,
response);
return response;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java
b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java
index a7547e9ab1..95e8917f00 100644
---
a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java
+++
b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java
@@ -159,7 +159,7 @@ public class ServerSegmentCompletionProtocolHandler {
try {
segmentUploader =
new Server2ControllerSegmentUploader(LOGGER,
_fileUploadDownloadClient, url, params.getSegmentName(),
- _segmentUploadRequestTimeoutMs, _serverMetrics, _authProvider);
+ _segmentUploadRequestTimeoutMs, _serverMetrics, _authProvider,
_rawTableName);
} catch (URISyntaxException e) {
LOGGER.error("Segment commit upload url error: ", e);
return SegmentCompletionProtocol.RESP_NOT_SENT;
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploaderTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploaderTest.java
index 62fa1a44ef..119b7cb7d1 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploaderTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploaderTest.java
@@ -28,11 +28,13 @@ import java.util.Map;
import java.util.UUID;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.exception.HttpErrorStatusException;
+import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.BasePinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.utils.StringUtil;
+import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -42,6 +44,7 @@ public class PinotFSSegmentUploaderTest {
private static final int TIMEOUT_IN_MS = 100;
private File _file;
private LLCSegmentName _llcSegmentName;
+ private ServerMetrics _serverMetrics = Mockito.mock(ServerMetrics.class);
@BeforeClass
public void setUp()
@@ -61,7 +64,7 @@ public class PinotFSSegmentUploaderTest {
@Test
public void testSuccessfulUpload() {
- SegmentUploader segmentUploader = new
PinotFSSegmentUploader("hdfs://root", TIMEOUT_IN_MS);
+ SegmentUploader segmentUploader = new
PinotFSSegmentUploader("hdfs://root", TIMEOUT_IN_MS, _serverMetrics);
URI segmentURI = segmentUploader.uploadSegment(_file, _llcSegmentName);
Assert.assertTrue(segmentURI.toString().startsWith(StringUtil
.join(File.separator, "hdfs://root", _llcSegmentName.getTableName(),
_llcSegmentName.getSegmentName())));
@@ -69,7 +72,7 @@ public class PinotFSSegmentUploaderTest {
@Test
public void testSegmentAlreadyExist() {
- SegmentUploader segmentUploader = new
PinotFSSegmentUploader("existing://root", TIMEOUT_IN_MS);
+ SegmentUploader segmentUploader = new
PinotFSSegmentUploader("existing://root", TIMEOUT_IN_MS, _serverMetrics);
URI segmentURI = segmentUploader.uploadSegment(_file, _llcSegmentName);
Assert.assertTrue(segmentURI.toString().startsWith(StringUtil
.join(File.separator, "existing://root",
_llcSegmentName.getTableName(), _llcSegmentName.getSegmentName())));
@@ -77,14 +80,14 @@ public class PinotFSSegmentUploaderTest {
@Test
public void testUploadTimeOut() {
- SegmentUploader segmentUploader = new
PinotFSSegmentUploader("timeout://root", TIMEOUT_IN_MS);
+ SegmentUploader segmentUploader = new
PinotFSSegmentUploader("timeout://root", TIMEOUT_IN_MS, _serverMetrics);
URI segmentURI = segmentUploader.uploadSegment(_file, _llcSegmentName);
Assert.assertNull(segmentURI);
}
@Test
public void testNoSegmentStoreConfigured() {
- SegmentUploader segmentUploader = new PinotFSSegmentUploader("",
TIMEOUT_IN_MS);
+ SegmentUploader segmentUploader = new PinotFSSegmentUploader("",
TIMEOUT_IN_MS, _serverMetrics);
URI segmentURI = segmentUploader.uploadSegment(_file, _llcSegmentName);
Assert.assertNull(segmentURI);
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploaderTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploaderTest.java
index 86a7088d94..3dbcfa506c 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploaderTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploaderTest.java
@@ -91,7 +91,7 @@ public class Server2ControllerSegmentUploaderTest {
throws URISyntaxException {
Server2ControllerSegmentUploader uploader =
new Server2ControllerSegmentUploader(_logger,
_fileUploadDownloadClient, GOOD_CONTROLLER_VIP, "segmentName",
- 10000, mock(ServerMetrics.class), null);
+ 10000, mock(ServerMetrics.class), null,
_llcSegmentName.getTableName());
URI segmentURI = uploader.uploadSegment(_file, _llcSegmentName);
Assert.assertEquals(segmentURI.toString(), SEGMENT_LOCATION);
}
@@ -101,7 +101,7 @@ public class Server2ControllerSegmentUploaderTest {
throws URISyntaxException {
Server2ControllerSegmentUploader uploader =
new Server2ControllerSegmentUploader(_logger,
_fileUploadDownloadClient, BAD_CONTROLLER_VIP, "segmentName",
- 10000, mock(ServerMetrics.class), null);
+ 10000, mock(ServerMetrics.class), null,
_llcSegmentName.getTableName());
URI segmentURI = uploader.uploadSegment(_file, _llcSegmentName);
Assert.assertNull(segmentURI);
}
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index 6806d16c10..6bfcf09952 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -66,6 +66,7 @@ import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoader;
import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
+import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
@@ -113,7 +114,7 @@ public class HelixInstanceDataManager implements
InstanceDataManager {
_helixManager = helixManager;
_serverMetrics = serverMetrics;
_segmentUploader = new
PinotFSSegmentUploader(_instanceDataManagerConfig.getSegmentStoreUri(),
- PinotFSSegmentUploader.DEFAULT_SEGMENT_UPLOAD_TIMEOUT_MILLIS);
+
ServerSegmentCompletionProtocolHandler.getSegmentUploadRequestTimeoutMs(),
_serverMetrics);
_externalViewDroppedMaxWaitMs =
_instanceDataManagerConfig.getExternalViewDroppedMaxWaitMs();
_externalViewDroppedCheckInternalMs =
_instanceDataManagerConfig.getExternalViewDroppedCheckIntervalMs();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]