This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 75a4bc3f39 Allow custom operations before upsert preload starts 
(#11265)
75a4bc3f39 is described below

commit 75a4bc3f39ef825665b368929520a543b74bfd54
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Fri Aug 4 06:03:48 2023 -0700

    Allow custom operations before upsert preload starts (#11265)
---
 .../upsert/BaseTableUpsertMetadataManager.java     | 51 ++++++++++++++++------
 1 file changed, 37 insertions(+), 14 deletions(-)

diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
index 148248b0be..2fecc4fbe5 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
@@ -57,9 +57,10 @@ import org.slf4j.LoggerFactory;
 @ThreadSafe
 public abstract class BaseTableUpsertMetadataManager implements 
TableUpsertMetadataManager {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(BaseTableUpsertMetadataManager.class);
-  private TableConfig _tableConfig;
-  private Schema _schema;
-  private TableDataManager _tableDataManager;
+
+  protected TableConfig _tableConfig;
+  protected Schema _schema;
+  protected TableDataManager _tableDataManager;
   protected String _tableNameWithType;
   protected List<String> _primaryKeyColumns;
   protected List<String> _comparisonColumns;
@@ -70,6 +71,9 @@ public abstract class BaseTableUpsertMetadataManager 
implements TableUpsertMetad
   protected double _metadataTTL;
   protected File _tableIndexDir;
   protected ServerMetrics _serverMetrics;
+  protected HelixManager _helixManager;
+  protected ExecutorService _segmentPreloadExecutor;
+
   private volatile boolean _isPreloading = false;
 
   @Override
@@ -109,6 +113,11 @@ public abstract class BaseTableUpsertMetadataManager 
implements TableUpsertMetad
     _metadataTTL = upsertConfig.getMetadataTTL();
     _tableIndexDir = tableDataManager.getTableDataDir();
     _serverMetrics = serverMetrics;
+    _helixManager = helixManager;
+    _segmentPreloadExecutor = segmentPreloadExecutor;
+
+    initCustomVariables();
+
     if (_enableSnapshot && segmentPreloadExecutor != null && 
upsertConfig.isEnablePreload()) {
       // Preloading the segments with snapshots for fast upsert metadata 
recovery.
       // Note that there is an implicit waiting logic between the thread doing 
the segment preloading here and the
@@ -119,7 +128,7 @@ public abstract class BaseTableUpsertMetadataManager 
implements TableUpsertMetad
       // happens as the lambda of ConcurrentHashMap.computeIfAbsent() method, 
which ensures the waiting logic.
       try {
         _isPreloading = true;
-        preloadSegments(helixManager, segmentPreloadExecutor);
+        preloadSegments();
       } catch (Exception e) {
         // Even if preloading fails, we should continue to complete the 
initialization, so that TableDataManager can be
         // created. Once TableDataManager is created, no more segment 
preloading would happen, and the normal segment
@@ -136,16 +145,24 @@ public abstract class BaseTableUpsertMetadataManager 
implements TableUpsertMetad
     }
   }
 
+  /**
+   * Can be overridden to initialize custom variables after other variables 
are set but before preload starts. This is
+   * needed because preload will load segments which might require these 
custom variables.
+   */
+  protected void initCustomVariables() {
+  }
+
   /**
    * Get the ideal state and find segments assigned to current instance, then 
preload those with validDocIds snapshot.
    * Skip those without the snapshots and those whose crc has changed, as they 
will be handled by normal Helix state
    * transitions, which will proceed after the preloading phase fully 
completes.
    */
-  private void preloadSegments(HelixManager helixManager, ExecutorService 
segmentPreloadExecutor)
+  private void preloadSegments()
       throws Exception {
     LOGGER.info("Preload segments from table: {} for fast upsert metadata 
recovery", _tableNameWithType);
-    IdealState idealState = HelixHelper.getTableIdealState(helixManager, 
_tableNameWithType);
-    ZkHelixPropertyStore<ZNRecord> propertyStore = 
helixManager.getHelixPropertyStore();
+    onPreloadStart();
+    IdealState idealState = HelixHelper.getTableIdealState(_helixManager, 
_tableNameWithType);
+    ZkHelixPropertyStore<ZNRecord> propertyStore = 
_helixManager.getHelixPropertyStore();
     String instanceId = getInstanceId();
     IndexLoadingConfig indexLoadingConfig = createIndexLoadingConfig();
     List<Future<?>> futures = new ArrayList<>();
@@ -156,7 +173,7 @@ public abstract class BaseTableUpsertMetadataManager 
implements TableUpsertMetad
         LOGGER.info("Skip segment: {} as its ideal state: {} is not ONLINE", 
segmentName, state);
         continue;
       }
-      futures.add(segmentPreloadExecutor.submit(() -> {
+      futures.add(_segmentPreloadExecutor.submit(() -> {
         try {
           preloadSegment(segmentName, indexLoadingConfig, propertyStore);
         } catch (Exception e) {
@@ -179,6 +196,18 @@ public abstract class BaseTableUpsertMetadataManager 
implements TableUpsertMetad
     LOGGER.info("Preloaded segments from table: {} for fast upsert metadata 
recovery", _tableNameWithType);
   }
 
+  /**
+   * Can be overridden to perform operations before preload starts.
+   */
+  protected void onPreloadStart() {
+  }
+
+  /**
+   * Can be overridden to perform operations after preload is done.
+   */
+  protected void onPreloadFinish() {
+  }
+
   private String getInstanceId() {
     InstanceDataManagerConfig instanceDataManagerConfig =
         
_tableDataManager.getTableDataManagerConfig().getInstanceDataManagerConfig();
@@ -226,12 +255,6 @@ public abstract class BaseTableUpsertMetadataManager 
implements TableUpsertMetad
     return new File(SegmentDirectoryPaths.findSegmentDirectory(indexDir), 
V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME);
   }
 
-  /**
-   * Can be overridden to perform operations after preload is done.
-   */
-  protected void onPreloadFinish() {
-  }
-
   @Override
   public boolean isPreloading() {
     return _isPreloading;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to