This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 96abef8374 Add a server level config for segment server upload to deep
store. (#14093)
96abef8374 is described below
commit 96abef83747b6b9cbe31c69409e74df249bff4d7
Author: RAGHVENDRA KUMAR YADAV <[email protected]>
AuthorDate: Mon Sep 30 10:41:40 2024 -0700
Add a server level config for segment server upload to deep store. (#14093)
---
.../manager/realtime/SegmentCommitterFactory.java | 10 +++++++++-
.../realtime/SegmentCommitterFactoryTest.java | 20 ++++++++++++++++----
.../helix/HelixInstanceDataManagerConfig.java | 8 ++++++++
.../config/instance/InstanceDataManagerConfig.java | 2 ++
.../org/apache/pinot/spi/stream/StreamConfig.java | 22 ++++++++++++----------
5 files changed, 47 insertions(+), 15 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
index f8315b99d9..33a3b55654 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
@@ -23,6 +23,7 @@ import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
+import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
@@ -54,7 +55,14 @@ public class SegmentCommitterFactory {
public SegmentCommitter
createSegmentCommitter(SegmentCompletionProtocol.Request.Params params,
String controllerVipUrl)
throws URISyntaxException {
- boolean uploadToFs = _streamConfig.isServerUploadToDeepStore();
+ InstanceDataManagerConfig instanceDataManagerConfig =
_indexLoadingConfig.getInstanceDataManagerConfig();
+
+ boolean uploadToFs =
instanceDataManagerConfig.isUploadSegmentToDeepStore();
+ Boolean streamConfigServerUploadToDeepStore =
_streamConfig.isServerUploadToDeepStore();
+ if (streamConfigServerUploadToDeepStore != null) {
+ uploadToFs = streamConfigServerUploadToDeepStore;
+ }
+
String peerSegmentDownloadScheme =
_tableConfig.getValidationConfig().getPeerSegmentDownloadScheme();
String segmentStoreUri = _indexLoadingConfig.getSegmentStoreURI();
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactoryTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactoryTest.java
index 65a2b16b96..a3a5fe91dc 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactoryTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactoryTest.java
@@ -27,8 +27,10 @@ import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
+import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.stream.StreamConfigProperties;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.mockito.Mockito;
@@ -62,9 +64,10 @@ public class SegmentCommitterFactoryTest {
ServerSegmentCompletionProtocolHandler protocolHandler =
new
ServerSegmentCompletionProtocolHandler(Mockito.mock(ServerMetrics.class),
"test_REALTIME");
String controllerVipUrl = "http://localhost:1234";
+ IndexLoadingConfig indexLoadingConfig = mockIndexLoadConfig();
SegmentCompletionProtocol.Request.Params requestParams = new
SegmentCompletionProtocol.Request.Params();
SegmentCommitterFactory factory = new
SegmentCommitterFactory(Mockito.mock(Logger.class), protocolHandler, config,
- Mockito.mock(IndexLoadingConfig.class),
Mockito.mock(ServerMetrics.class));
+ indexLoadingConfig, Mockito.mock(ServerMetrics.class));
SegmentCommitter committer = factory.createSegmentCommitter(requestParams,
controllerVipUrl);
Assert.assertNotNull(committer);
Assert.assertTrue(committer instanceof SplitSegmentCommitter);
@@ -83,9 +86,8 @@ public class SegmentCommitterFactoryTest {
Map<String, String> streamConfigMap = new
HashMap<>(getMinimumStreamConfigMap());
streamConfigMap.put(StreamConfigProperties.SERVER_UPLOAD_TO_DEEPSTORE,
"true");
TableConfig config = createRealtimeTableConfig("testDeepStoreConfig",
streamConfigMap).build();
- IndexLoadingConfig indexLoadingConfig =
Mockito.mock(IndexLoadingConfig.class);
-
Mockito.when(indexLoadingConfig.getSegmentStoreURI()).thenReturn("file:///path/to/segment/store.txt");
-
+ // Create and set up the mocked IndexLoadingConfig and InstanceDataManager
+ IndexLoadingConfig indexLoadingConfig = mockIndexLoadConfig();
SegmentCommitterFactory factory = new
SegmentCommitterFactory(Mockito.mock(Logger.class), protocolHandler, config,
indexLoadingConfig, Mockito.mock(ServerMetrics.class));
SegmentCommitter committer = factory.createSegmentCommitter(requestParams,
controllerVipUrl);
@@ -107,4 +109,14 @@ public class SegmentCommitterFactoryTest {
Assert.assertTrue(committer instanceof SplitSegmentCommitter);
Assert.assertTrue(((SplitSegmentCommitter) committer).getSegmentUploader()
instanceof PinotFSSegmentUploader);
}
+
+ private IndexLoadingConfig mockIndexLoadConfig() {
+ IndexLoadingConfig indexLoadingConfig =
Mockito.mock(IndexLoadingConfig.class);
+ InstanceDataManagerConfig instanceDataManagerConfig =
Mockito.mock(InstanceDataManagerConfig.class);
+
Mockito.when(indexLoadingConfig.getInstanceDataManagerConfig()).thenReturn(instanceDataManagerConfig);
+ PinotConfiguration pinotConfiguration =
Mockito.mock(PinotConfiguration.class);
+
Mockito.when(instanceDataManagerConfig.getConfig()).thenReturn(pinotConfiguration);
+
+ return indexLoadingConfig;
+ }
}
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
index 198cbb0434..ea90b9a6b7 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
@@ -135,6 +135,9 @@ public class HelixInstanceDataManagerConfig implements
InstanceDataManagerConfig
private static final String EXTERNAL_VIEW_DROPPED_MAX_WAIT_MS =
"external.view.dropped.max.wait.ms";
private static final String EXTERNAL_VIEW_DROPPED_CHECK_INTERVAL_MS =
"external.view.dropped.check.interval.ms";
+ public static final String UPLOAD_SEGMENT_TO_DEEP_STORE =
"segment.upload.to.deep.store";
+ public static final boolean DEFAULT_UPLOAD_SEGMENT_TO_DEEP_STORE = false;
+
private final static String[] REQUIRED_KEYS = {INSTANCE_ID};
private static final long DEFAULT_ERROR_CACHE_SIZE = 100L;
private static final int DEFAULT_DELETED_SEGMENTS_CACHE_SIZE = 10_000;
@@ -330,4 +333,9 @@ public class HelixInstanceDataManagerConfig implements
InstanceDataManagerConfig
public Map<String, Map<String, String>> getTierConfigs() {
return _tierConfigs;
}
+
+ @Override
+ public boolean isUploadSegmentToDeepStore() {
+ return _serverConfig.getProperty(UPLOAD_SEGMENT_TO_DEEP_STORE,
DEFAULT_UPLOAD_SEGMENT_TO_DEEP_STORE);
+ }
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java
index d3addef83c..152e47ac95 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java
@@ -78,4 +78,6 @@ public interface InstanceDataManagerConfig {
PinotConfiguration getAuthConfig();
Map<String, Map<String, String>> getTierConfigs();
+
+ boolean isUploadSegmentToDeepStore();
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
index c0fc6c891d..f7390f5642 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
@@ -24,6 +24,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.spi.utils.DataSizeUtils;
import org.apache.pinot.spi.utils.TimeUtils;
@@ -41,7 +42,6 @@ public class StreamConfig {
public static final long DEFAULT_FLUSH_THRESHOLD_TIME_MILLIS =
TimeUnit.MILLISECONDS.convert(6, TimeUnit.HOURS);
public static final long DEFAULT_FLUSH_THRESHOLD_SEGMENT_SIZE_BYTES = 200 *
1024 * 1024; // 200M
public static final int DEFAULT_FLUSH_AUTOTUNE_INITIAL_ROWS = 100_000;
- public static final String DEFAULT_SERVER_UPLOAD_TO_DEEPSTORE = "false";
public static final String DEFAULT_CONSUMER_FACTORY_CLASS_NAME_STRING =
"org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory";
@@ -79,10 +79,11 @@ public class StreamConfig {
// Allow overriding it to use different offset criteria
private OffsetCriteria _offsetCriteria;
- // Indicates if the segment should be uploaded to the deep store's file
system or to the controller during the
- // segment commit protocol. By default, segment is uploaded to the
controller during commit.
- // If this flag is set to true, the segment is uploaded to deep store.
- private final boolean _serverUploadToDeepStore;
+ // Indicate StreamConfig flag for table if segment should be uploaded to the
deep store's file system or to the
+ // controller during the segment commit protocol. if config is not present
in Table StreamConfig
+ // _serverUploadToDeepStore is null and method isServerUploadToDeepStore()
overrides the default value with Server
+ // level config
+ private final Boolean _serverUploadToDeepStore;
/**
* Initializes a StreamConfig using the map of stream configs from the table
config
@@ -175,9 +176,9 @@ public class StreamConfig {
_flushThresholdSegmentRows =
extractFlushThresholdSegmentRows(streamConfigMap);
_flushThresholdTimeMillis =
extractFlushThresholdTimeMillis(streamConfigMap);
_flushThresholdSegmentSizeBytes =
extractFlushThresholdSegmentSize(streamConfigMap);
- _serverUploadToDeepStore = Boolean.parseBoolean(
-
streamConfigMap.getOrDefault(StreamConfigProperties.SERVER_UPLOAD_TO_DEEPSTORE,
- DEFAULT_SERVER_UPLOAD_TO_DEEPSTORE));
+ _serverUploadToDeepStore =
streamConfigMap.containsKey(StreamConfigProperties.SERVER_UPLOAD_TO_DEEPSTORE)
+ ?
Boolean.valueOf(streamConfigMap.get(StreamConfigProperties.SERVER_UPLOAD_TO_DEEPSTORE))
+ : null;
int autotuneInitialRows = 0;
String initialRowsValue =
streamConfigMap.get(StreamConfigProperties.SEGMENT_FLUSH_AUTOTUNE_INITIAL_ROWS);
@@ -214,7 +215,8 @@ public class StreamConfig {
}
}
- public boolean isServerUploadToDeepStore() {
+ @Nullable
+ public Boolean isServerUploadToDeepStore() {
return _serverUploadToDeepStore;
}
@@ -416,7 +418,7 @@ public class StreamConfig {
&& _flushThresholdSegmentSizeBytes ==
that._flushThresholdSegmentSizeBytes
&& _flushAutotuneInitialRows == that._flushAutotuneInitialRows
&& Double.compare(_topicConsumptionRateLimit,
that._topicConsumptionRateLimit) == 0
- && _serverUploadToDeepStore == that._serverUploadToDeepStore &&
Objects.equals(_type, that._type)
+ && Objects.equals(_serverUploadToDeepStore,
that._serverUploadToDeepStore) && Objects.equals(_type, that._type)
&& Objects.equals(_topicName, that._topicName) &&
Objects.equals(_tableNameWithType, that._tableNameWithType)
&& Objects.equals(_consumerFactoryClassName,
that._consumerFactoryClassName) && Objects.equals(_decoderClass,
that._decoderClass) && Objects.equals(_decoderProperties,
that._decoderProperties) && Objects.equals(_groupId,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]