This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 96abef8374 Add a server level config for segment server upload to deep 
store. (#14093)
96abef8374 is described below

commit 96abef83747b6b9cbe31c69409e74df249bff4d7
Author: RAGHVENDRA KUMAR YADAV <[email protected]>
AuthorDate: Mon Sep 30 10:41:40 2024 -0700

    Add a server level config for segment server upload to deep store. (#14093)
---
 .../manager/realtime/SegmentCommitterFactory.java  | 10 +++++++++-
 .../realtime/SegmentCommitterFactoryTest.java      | 20 ++++++++++++++++----
 .../helix/HelixInstanceDataManagerConfig.java      |  8 ++++++++
 .../config/instance/InstanceDataManagerConfig.java |  2 ++
 .../org/apache/pinot/spi/stream/StreamConfig.java  | 22 ++++++++++++----------
 5 files changed, 47 insertions(+), 15 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
index f8315b99d9..33a3b55654 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
@@ -23,6 +23,7 @@ import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
+import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.utils.IngestionConfigUtils;
@@ -54,7 +55,14 @@ public class SegmentCommitterFactory {
   public SegmentCommitter 
createSegmentCommitter(SegmentCompletionProtocol.Request.Params params,
       String controllerVipUrl)
       throws URISyntaxException {
-    boolean uploadToFs = _streamConfig.isServerUploadToDeepStore();
+    InstanceDataManagerConfig instanceDataManagerConfig = 
_indexLoadingConfig.getInstanceDataManagerConfig();
+
+    boolean uploadToFs = 
instanceDataManagerConfig.isUploadSegmentToDeepStore();
+    Boolean streamConfigServerUploadToDeepStore = 
_streamConfig.isServerUploadToDeepStore();
+    if (streamConfigServerUploadToDeepStore != null) {
+      uploadToFs = streamConfigServerUploadToDeepStore;
+    }
+
     String peerSegmentDownloadScheme = 
_tableConfig.getValidationConfig().getPeerSegmentDownloadScheme();
     String segmentStoreUri = _indexLoadingConfig.getSegmentStoreURI();
 
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactoryTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactoryTest.java
index 65a2b16b96..a3a5fe91dc 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactoryTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactoryTest.java
@@ -27,8 +27,10 @@ import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
+import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.stream.StreamConfigProperties;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.mockito.Mockito;
@@ -62,9 +64,10 @@ public class SegmentCommitterFactoryTest {
     ServerSegmentCompletionProtocolHandler protocolHandler =
         new 
ServerSegmentCompletionProtocolHandler(Mockito.mock(ServerMetrics.class), 
"test_REALTIME");
     String controllerVipUrl = "http://localhost:1234";;
+    IndexLoadingConfig indexLoadingConfig = mockIndexLoadConfig();
     SegmentCompletionProtocol.Request.Params requestParams = new 
SegmentCompletionProtocol.Request.Params();
     SegmentCommitterFactory factory = new 
SegmentCommitterFactory(Mockito.mock(Logger.class), protocolHandler, config,
-        Mockito.mock(IndexLoadingConfig.class), 
Mockito.mock(ServerMetrics.class));
+        indexLoadingConfig, Mockito.mock(ServerMetrics.class));
     SegmentCommitter committer = factory.createSegmentCommitter(requestParams, 
controllerVipUrl);
     Assert.assertNotNull(committer);
     Assert.assertTrue(committer instanceof SplitSegmentCommitter);
@@ -83,9 +86,8 @@ public class SegmentCommitterFactoryTest {
     Map<String, String> streamConfigMap = new 
HashMap<>(getMinimumStreamConfigMap());
     streamConfigMap.put(StreamConfigProperties.SERVER_UPLOAD_TO_DEEPSTORE, 
"true");
     TableConfig config = createRealtimeTableConfig("testDeepStoreConfig", 
streamConfigMap).build();
-    IndexLoadingConfig indexLoadingConfig = 
Mockito.mock(IndexLoadingConfig.class);
-    
Mockito.when(indexLoadingConfig.getSegmentStoreURI()).thenReturn("file:///path/to/segment/store.txt");
-
+    // Create and set up the mocked IndexLoadingConfig and InstanceDataManager
+    IndexLoadingConfig indexLoadingConfig = mockIndexLoadConfig();
     SegmentCommitterFactory factory = new 
SegmentCommitterFactory(Mockito.mock(Logger.class), protocolHandler, config,
         indexLoadingConfig, Mockito.mock(ServerMetrics.class));
     SegmentCommitter committer = factory.createSegmentCommitter(requestParams, 
controllerVipUrl);
@@ -107,4 +109,14 @@ public class SegmentCommitterFactoryTest {
     Assert.assertTrue(committer instanceof SplitSegmentCommitter);
     Assert.assertTrue(((SplitSegmentCommitter) committer).getSegmentUploader() 
instanceof PinotFSSegmentUploader);
   }
+
+  private IndexLoadingConfig mockIndexLoadConfig() {
+    IndexLoadingConfig indexLoadingConfig = 
Mockito.mock(IndexLoadingConfig.class);
+    InstanceDataManagerConfig instanceDataManagerConfig = 
Mockito.mock(InstanceDataManagerConfig.class);
+    
Mockito.when(indexLoadingConfig.getInstanceDataManagerConfig()).thenReturn(instanceDataManagerConfig);
+    PinotConfiguration pinotConfiguration = 
Mockito.mock(PinotConfiguration.class);
+    
Mockito.when(instanceDataManagerConfig.getConfig()).thenReturn(pinotConfiguration);
+
+    return indexLoadingConfig;
+  }
 }
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
index 198cbb0434..ea90b9a6b7 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
@@ -135,6 +135,9 @@ public class HelixInstanceDataManagerConfig implements 
InstanceDataManagerConfig
   private static final String EXTERNAL_VIEW_DROPPED_MAX_WAIT_MS = 
"external.view.dropped.max.wait.ms";
   private static final String EXTERNAL_VIEW_DROPPED_CHECK_INTERVAL_MS = 
"external.view.dropped.check.interval.ms";
 
+  public static final String UPLOAD_SEGMENT_TO_DEEP_STORE = 
"segment.upload.to.deep.store";
+  public static final boolean DEFAULT_UPLOAD_SEGMENT_TO_DEEP_STORE = false;
+
   private final static String[] REQUIRED_KEYS = {INSTANCE_ID};
   private static final long DEFAULT_ERROR_CACHE_SIZE = 100L;
   private static final int DEFAULT_DELETED_SEGMENTS_CACHE_SIZE = 10_000;
@@ -330,4 +333,9 @@ public class HelixInstanceDataManagerConfig implements 
InstanceDataManagerConfig
   public Map<String, Map<String, String>> getTierConfigs() {
     return _tierConfigs;
   }
+
+  @Override
+  public boolean isUploadSegmentToDeepStore() {
+    return _serverConfig.getProperty(UPLOAD_SEGMENT_TO_DEEP_STORE, 
DEFAULT_UPLOAD_SEGMENT_TO_DEEP_STORE);
+  }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java
index d3addef83c..152e47ac95 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java
@@ -78,4 +78,6 @@ public interface InstanceDataManagerConfig {
   PinotConfiguration getAuthConfig();
 
   Map<String, Map<String, String>> getTierConfigs();
+
+  boolean isUploadSegmentToDeepStore();
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
index c0fc6c891d..f7390f5642 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.spi.utils.DataSizeUtils;
 import org.apache.pinot.spi.utils.TimeUtils;
@@ -41,7 +42,6 @@ public class StreamConfig {
   public static final long DEFAULT_FLUSH_THRESHOLD_TIME_MILLIS = 
TimeUnit.MILLISECONDS.convert(6, TimeUnit.HOURS);
   public static final long DEFAULT_FLUSH_THRESHOLD_SEGMENT_SIZE_BYTES = 200 * 
1024 * 1024; // 200M
   public static final int DEFAULT_FLUSH_AUTOTUNE_INITIAL_ROWS = 100_000;
-  public static final String DEFAULT_SERVER_UPLOAD_TO_DEEPSTORE = "false";
 
   public static final String DEFAULT_CONSUMER_FACTORY_CLASS_NAME_STRING =
       "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory";
@@ -79,10 +79,11 @@ public class StreamConfig {
   // Allow overriding it to use different offset criteria
   private OffsetCriteria _offsetCriteria;
 
-  // Indicates if the segment should be uploaded to the deep store's file 
system or to the controller during the
-  // segment commit protocol. By default, segment is uploaded to the 
controller during commit.
-  // If this flag is set to true, the segment is uploaded to deep store.
-  private final boolean _serverUploadToDeepStore;
+  // Indicate StreamConfig flag for table if segment should be uploaded to the 
deep store's file system or to the
+  // controller during the segment commit protocol. if config is not present 
in Table StreamConfig
+  // _serverUploadToDeepStore is null and method isServerUploadToDeepStore() 
overrides the default value with Server
+  // level config
+  private final Boolean _serverUploadToDeepStore;
 
   /**
    * Initializes a StreamConfig using the map of stream configs from the table 
config
@@ -175,9 +176,9 @@ public class StreamConfig {
     _flushThresholdSegmentRows = 
extractFlushThresholdSegmentRows(streamConfigMap);
     _flushThresholdTimeMillis = 
extractFlushThresholdTimeMillis(streamConfigMap);
     _flushThresholdSegmentSizeBytes = 
extractFlushThresholdSegmentSize(streamConfigMap);
-    _serverUploadToDeepStore = Boolean.parseBoolean(
-        
streamConfigMap.getOrDefault(StreamConfigProperties.SERVER_UPLOAD_TO_DEEPSTORE,
-            DEFAULT_SERVER_UPLOAD_TO_DEEPSTORE));
+    _serverUploadToDeepStore = 
streamConfigMap.containsKey(StreamConfigProperties.SERVER_UPLOAD_TO_DEEPSTORE)
+        ? 
Boolean.valueOf(streamConfigMap.get(StreamConfigProperties.SERVER_UPLOAD_TO_DEEPSTORE))
+        : null;
 
     int autotuneInitialRows = 0;
     String initialRowsValue = 
streamConfigMap.get(StreamConfigProperties.SEGMENT_FLUSH_AUTOTUNE_INITIAL_ROWS);
@@ -214,7 +215,8 @@ public class StreamConfig {
     }
   }
 
-  public boolean isServerUploadToDeepStore() {
+  @Nullable
+  public Boolean isServerUploadToDeepStore() {
     return _serverUploadToDeepStore;
   }
 
@@ -416,7 +418,7 @@ public class StreamConfig {
         && _flushThresholdSegmentSizeBytes == 
that._flushThresholdSegmentSizeBytes
         && _flushAutotuneInitialRows == that._flushAutotuneInitialRows
         && Double.compare(_topicConsumptionRateLimit, 
that._topicConsumptionRateLimit) == 0
-        && _serverUploadToDeepStore == that._serverUploadToDeepStore && 
Objects.equals(_type, that._type)
+        && Objects.equals(_serverUploadToDeepStore, 
that._serverUploadToDeepStore) && Objects.equals(_type, that._type)
         && Objects.equals(_topicName, that._topicName) && 
Objects.equals(_tableNameWithType, that._tableNameWithType)
         && Objects.equals(_consumerFactoryClassName, 
that._consumerFactoryClassName) && Objects.equals(_decoderClass,
         that._decoderClass) && Objects.equals(_decoderProperties, 
that._decoderProperties) && Objects.equals(_groupId,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to