This is an automated email from the ASF dual-hosted git repository.
xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new ee4049e1e8 refine a few logs and checks for upsert (#13747)
ee4049e1e8 is described below
commit ee4049e1e80bc2521242dd523236f81549e03222
Author: Xiaobing <[email protected]>
AuthorDate: Tue Aug 6 17:34:06 2024 -0700
refine a few logs and checks for upsert (#13747)
---
.../manager/realtime/RealtimeTableDataManager.java | 10 ++--------
.../upsert/BasePartitionUpsertMetadataManager.java | 6 ++++--
.../local/upsert/BaseTableUpsertMetadataManager.java | 19 +++++++++++++------
.../local/upsert/TableUpsertMetadataManager.java | 2 ++
4 files changed, 21 insertions(+), 16 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 2db6a5dbd0..75e8a4c235 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -383,17 +383,11 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
&& _tableUpsertMetadataManager.getUpsertMode() ==
UpsertConfig.Mode.PARTIAL;
}
- private boolean isUpsertPreloadEnabled() {
- UpsertConfig upsertConfig = _tableConfig.getUpsertConfig();
- return _tableUpsertMetadataManager != null && _segmentPreloadExecutor !=
null && upsertConfig != null
- && upsertConfig.isEnableSnapshot() && upsertConfig.isEnablePreload();
- }
-
/**
- * Handles upsert preload, and returns whether the upsert preload is enabled.
+ * Handles upsert preload if the upsert preload is enabled.
*/
private void handleUpsertPreload(SegmentZKMetadata zkMetadata,
IndexLoadingConfig indexLoadingConfig) {
- if (!isUpsertPreloadEnabled()) {
+ if (_tableUpsertMetadataManager == null ||
!_tableUpsertMetadataManager.isEnablePreload()) {
return;
}
String segmentName = zkMetadata.getSegmentName();
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index f03c9e8629..318187e40b 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -162,7 +162,7 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
_partialUpsertHandler = context.getPartialUpsertHandler();
_enableSnapshot = context.isSnapshotEnabled();
_snapshotLock = _enableSnapshot ? new ReentrantReadWriteLock() : null;
- _isPreloading = _enableSnapshot && context.isPreloadEnabled();
+ _isPreloading = context.isPreloadEnabled();
_metadataTTL = context.getMetadataTTL();
_deletedKeysTTL = context.getDeletedKeysTTL();
_tableIndexDir = context.getTableIndexDir();
@@ -933,6 +933,7 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
int numImmutableSegments = 0;
int numConsumingSegments = 0;
+ int numUnchangedSegments = 0;
// The segments without validDocIds snapshots should take their snapshots
at last. So that when there is failure
// to take snapshots, the validDocIds snapshot on disk still keep track of
an exclusive set of valid docs across
// segments. Because the valid docs as tracked by the existing validDocIds
snapshots can only get less. That no
@@ -945,6 +946,7 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
}
if (!_updatedSegmentsSinceLastSnapshot.contains(segment)) {
// if no updates since last snapshot then skip
+ numUnchangedSegments++;
continue;
}
try {
@@ -976,7 +978,7 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
ServerGauge.UPSERT_VALID_DOC_ID_SNAPSHOT_COUNT, numImmutableSegments);
_serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId,
ServerGauge.UPSERT_PRIMARY_KEYS_IN_SNAPSHOT_COUNT,
numPrimaryKeysInSnapshot);
- int numMissedSegments = numTrackedSegments - numImmutableSegments -
numConsumingSegments;
+ int numMissedSegments = numTrackedSegments - numImmutableSegments -
numConsumingSegments - numUnchangedSegments;
if (numMissedSegments > 0) {
_serverMetrics.addMeteredTableValue(_tableNameWithType,
String.valueOf(_partitionId),
ServerMeter.UPSERT_MISSED_VALID_DOC_ID_SNAPSHOT_COUNT,
numMissedSegments);
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
index 781b8b445c..6d77bbc535 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
@@ -40,6 +40,7 @@ public abstract class BaseTableUpsertMetadataManager
implements TableUpsertMetad
protected String _tableNameWithType;
protected UpsertContext _context;
protected UpsertConfig.ConsistencyMode _consistencyMode;
+ protected boolean _enablePreload;
@Override
public void init(TableConfig tableConfig, Schema schema, TableDataManager
tableDataManager) {
@@ -66,7 +67,8 @@ public abstract class BaseTableUpsertMetadataManager
implements TableUpsertMetad
String deleteRecordColumn = upsertConfig.getDeleteRecordColumn();
HashFunction hashFunction = upsertConfig.getHashFunction();
boolean enableSnapshot = upsertConfig.isEnableSnapshot();
- boolean enablePreload = upsertConfig.isEnablePreload();
+ _enablePreload =
+ enableSnapshot && upsertConfig.isEnablePreload() &&
tableDataManager.getSegmentPreloadExecutor() != null;
double metadataTTL = upsertConfig.getMetadataTTL();
double deletedKeysTTL = upsertConfig.getDeletedKeysTTL();
_consistencyMode = upsertConfig.getConsistencyMode();
@@ -78,16 +80,16 @@ public abstract class BaseTableUpsertMetadataManager
implements TableUpsertMetad
_context = new
UpsertContext.Builder().setTableConfig(tableConfig).setSchema(schema)
.setPrimaryKeyColumns(primaryKeyColumns).setComparisonColumns(comparisonColumns)
.setDeleteRecordColumn(deleteRecordColumn).setHashFunction(hashFunction)
-
.setPartialUpsertHandler(partialUpsertHandler).setEnableSnapshot(enableSnapshot).setEnablePreload(enablePreload)
-
.setMetadataTTL(metadataTTL).setDeletedKeysTTL(deletedKeysTTL).setConsistencyMode(_consistencyMode)
-
.setUpsertViewRefreshIntervalMs(upsertViewRefreshIntervalMs).setTableIndexDir(tableIndexDir)
- .setTableDataManager(tableDataManager).build();
+
.setPartialUpsertHandler(partialUpsertHandler).setEnableSnapshot(enableSnapshot)
+
.setEnablePreload(_enablePreload).setMetadataTTL(metadataTTL).setDeletedKeysTTL(deletedKeysTTL)
+
.setConsistencyMode(_consistencyMode).setUpsertViewRefreshIntervalMs(upsertViewRefreshIntervalMs)
+
.setTableIndexDir(tableIndexDir).setTableDataManager(tableDataManager).build();
LOGGER.info(
"Initialized {} for table: {} with primary key columns: {}, comparison
columns: {}, delete record column: {},"
+ " hash function: {}, upsert mode: {}, enable snapshot: {},
enable preload: {}, metadata TTL: {},"
+ " deleted Keys TTL: {}, consistency mode: {}, upsert view
refresh interval: {}ms, table index dir: {}",
getClass().getSimpleName(), _tableNameWithType, primaryKeyColumns,
comparisonColumns, deleteRecordColumn,
- hashFunction, upsertConfig.getMode(), enableSnapshot, enablePreload,
metadataTTL, deletedKeysTTL,
+ hashFunction, upsertConfig.getMode(), enableSnapshot, _enablePreload,
metadataTTL, deletedKeysTTL,
_consistencyMode, upsertViewRefreshIntervalMs, tableIndexDir);
initCustomVariables();
@@ -104,4 +106,9 @@ public abstract class BaseTableUpsertMetadataManager
implements TableUpsertMetad
public UpsertConfig.Mode getUpsertMode() {
return _context.getPartialUpsertHandler() == null ? UpsertConfig.Mode.FULL
: UpsertConfig.Mode.PARTIAL;
}
+
+ @Override
+ public boolean isEnablePreload() {
+ return _enablePreload;
+ }
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
index db21f2030e..123e3db553 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
@@ -41,6 +41,8 @@ public interface TableUpsertMetadataManager extends Closeable
{
UpsertConfig.Mode getUpsertMode();
+ boolean isEnablePreload();
+
/**
* Stops the metadata manager. After invoking this method, no access to the
metadata will be accepted.
*/
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]