This is an automated email from the ASF dual-hosted git repository.

xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ee4049e1e8 refine a few logs and checks for upsert (#13747)
ee4049e1e8 is described below

commit ee4049e1e80bc2521242dd523236f81549e03222
Author: Xiaobing <[email protected]>
AuthorDate: Tue Aug 6 17:34:06 2024 -0700

    refine a few logs and checks for upsert (#13747)
---
 .../manager/realtime/RealtimeTableDataManager.java    | 10 ++--------
 .../upsert/BasePartitionUpsertMetadataManager.java    |  6 ++++--
 .../local/upsert/BaseTableUpsertMetadataManager.java  | 19 +++++++++++++------
 .../local/upsert/TableUpsertMetadataManager.java      |  2 ++
 4 files changed, 21 insertions(+), 16 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 2db6a5dbd0..75e8a4c235 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -383,17 +383,11 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
         && _tableUpsertMetadataManager.getUpsertMode() == 
UpsertConfig.Mode.PARTIAL;
   }
 
-  private boolean isUpsertPreloadEnabled() {
-    UpsertConfig upsertConfig = _tableConfig.getUpsertConfig();
-    return _tableUpsertMetadataManager != null && _segmentPreloadExecutor != 
null && upsertConfig != null
-        && upsertConfig.isEnableSnapshot() && upsertConfig.isEnablePreload();
-  }
-
   /**
-   * Handles upsert preload, and returns whether the upsert preload is enabled.
+   * Handles upsert preload if the upsert preload is enabled.
    */
   private void handleUpsertPreload(SegmentZKMetadata zkMetadata, 
IndexLoadingConfig indexLoadingConfig) {
-    if (!isUpsertPreloadEnabled()) {
+    if (_tableUpsertMetadataManager == null || 
!_tableUpsertMetadataManager.isEnablePreload()) {
       return;
     }
     String segmentName = zkMetadata.getSegmentName();
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index f03c9e8629..318187e40b 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -162,7 +162,7 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
     _partialUpsertHandler = context.getPartialUpsertHandler();
     _enableSnapshot = context.isSnapshotEnabled();
     _snapshotLock = _enableSnapshot ? new ReentrantReadWriteLock() : null;
-    _isPreloading = _enableSnapshot && context.isPreloadEnabled();
+    _isPreloading = context.isPreloadEnabled();
     _metadataTTL = context.getMetadataTTL();
     _deletedKeysTTL = context.getDeletedKeysTTL();
     _tableIndexDir = context.getTableIndexDir();
@@ -933,6 +933,7 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
 
     int numImmutableSegments = 0;
     int numConsumingSegments = 0;
+    int numUnchangedSegments = 0;
     // The segments without validDocIds snapshots should take their snapshots 
at last. So that when there is failure
     // to take snapshots, the validDocIds snapshot on disk still keep track of 
an exclusive set of valid docs across
     // segments. Because the valid docs as tracked by the existing validDocIds 
snapshots can only get less. That no
@@ -945,6 +946,7 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
       }
       if (!_updatedSegmentsSinceLastSnapshot.contains(segment)) {
         // if no updates since last snapshot then skip
+        numUnchangedSegments++;
         continue;
       }
       try {
@@ -976,7 +978,7 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
         ServerGauge.UPSERT_VALID_DOC_ID_SNAPSHOT_COUNT, numImmutableSegments);
     _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId,
         ServerGauge.UPSERT_PRIMARY_KEYS_IN_SNAPSHOT_COUNT, 
numPrimaryKeysInSnapshot);
-    int numMissedSegments = numTrackedSegments - numImmutableSegments - 
numConsumingSegments;
+    int numMissedSegments = numTrackedSegments - numImmutableSegments - 
numConsumingSegments - numUnchangedSegments;
     if (numMissedSegments > 0) {
       _serverMetrics.addMeteredTableValue(_tableNameWithType, 
String.valueOf(_partitionId),
           ServerMeter.UPSERT_MISSED_VALID_DOC_ID_SNAPSHOT_COUNT, 
numMissedSegments);
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
index 781b8b445c..6d77bbc535 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
@@ -40,6 +40,7 @@ public abstract class BaseTableUpsertMetadataManager 
implements TableUpsertMetad
   protected String _tableNameWithType;
   protected UpsertContext _context;
   protected UpsertConfig.ConsistencyMode _consistencyMode;
+  protected boolean _enablePreload;
 
   @Override
   public void init(TableConfig tableConfig, Schema schema, TableDataManager 
tableDataManager) {
@@ -66,7 +67,8 @@ public abstract class BaseTableUpsertMetadataManager 
implements TableUpsertMetad
     String deleteRecordColumn = upsertConfig.getDeleteRecordColumn();
     HashFunction hashFunction = upsertConfig.getHashFunction();
     boolean enableSnapshot = upsertConfig.isEnableSnapshot();
-    boolean enablePreload = upsertConfig.isEnablePreload();
+    _enablePreload =
+        enableSnapshot && upsertConfig.isEnablePreload() && 
tableDataManager.getSegmentPreloadExecutor() != null;
     double metadataTTL = upsertConfig.getMetadataTTL();
     double deletedKeysTTL = upsertConfig.getDeletedKeysTTL();
     _consistencyMode = upsertConfig.getConsistencyMode();
@@ -78,16 +80,16 @@ public abstract class BaseTableUpsertMetadataManager 
implements TableUpsertMetad
     _context = new 
UpsertContext.Builder().setTableConfig(tableConfig).setSchema(schema)
         
.setPrimaryKeyColumns(primaryKeyColumns).setComparisonColumns(comparisonColumns)
         
.setDeleteRecordColumn(deleteRecordColumn).setHashFunction(hashFunction)
-        
.setPartialUpsertHandler(partialUpsertHandler).setEnableSnapshot(enableSnapshot).setEnablePreload(enablePreload)
-        
.setMetadataTTL(metadataTTL).setDeletedKeysTTL(deletedKeysTTL).setConsistencyMode(_consistencyMode)
-        
.setUpsertViewRefreshIntervalMs(upsertViewRefreshIntervalMs).setTableIndexDir(tableIndexDir)
-        .setTableDataManager(tableDataManager).build();
+        
.setPartialUpsertHandler(partialUpsertHandler).setEnableSnapshot(enableSnapshot)
+        
.setEnablePreload(_enablePreload).setMetadataTTL(metadataTTL).setDeletedKeysTTL(deletedKeysTTL)
+        
.setConsistencyMode(_consistencyMode).setUpsertViewRefreshIntervalMs(upsertViewRefreshIntervalMs)
+        
.setTableIndexDir(tableIndexDir).setTableDataManager(tableDataManager).build();
     LOGGER.info(
         "Initialized {} for table: {} with primary key columns: {}, comparison 
columns: {}, delete record column: {},"
             + " hash function: {}, upsert mode: {}, enable snapshot: {}, 
enable preload: {}, metadata TTL: {},"
             + " deleted Keys TTL: {}, consistency mode: {}, upsert view 
refresh interval: {}ms, table index dir: {}",
         getClass().getSimpleName(), _tableNameWithType, primaryKeyColumns, 
comparisonColumns, deleteRecordColumn,
-        hashFunction, upsertConfig.getMode(), enableSnapshot, enablePreload, 
metadataTTL, deletedKeysTTL,
+        hashFunction, upsertConfig.getMode(), enableSnapshot, _enablePreload, 
metadataTTL, deletedKeysTTL,
         _consistencyMode, upsertViewRefreshIntervalMs, tableIndexDir);
 
     initCustomVariables();
@@ -104,4 +106,9 @@ public abstract class BaseTableUpsertMetadataManager 
implements TableUpsertMetad
   public UpsertConfig.Mode getUpsertMode() {
     return _context.getPartialUpsertHandler() == null ? UpsertConfig.Mode.FULL 
: UpsertConfig.Mode.PARTIAL;
   }
+
+  @Override
+  public boolean isEnablePreload() {
+    return _enablePreload;
+  }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
index db21f2030e..123e3db553 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
@@ -41,6 +41,8 @@ public interface TableUpsertMetadataManager extends Closeable 
{
 
   UpsertConfig.Mode getUpsertMode();
 
+  boolean isEnablePreload();
+
   /**
    * Stops the metadata manager. After invoking this method, no access to the 
metadata will be accepted.
    */


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to