This is an automated email from the ASF dual-hosted git repository.
kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 75a4bc3f39 Allow custom operations before upsert preload starts
(#11265)
75a4bc3f39 is described below
commit 75a4bc3f39ef825665b368929520a543b74bfd54
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Fri Aug 4 06:03:48 2023 -0700
Allow custom operations before upsert preload starts (#11265)
---
.../upsert/BaseTableUpsertMetadataManager.java | 51 ++++++++++++++++------
1 file changed, 37 insertions(+), 14 deletions(-)
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
index 148248b0be..2fecc4fbe5 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
@@ -57,9 +57,10 @@ import org.slf4j.LoggerFactory;
@ThreadSafe
public abstract class BaseTableUpsertMetadataManager implements
TableUpsertMetadataManager {
private static final Logger LOGGER =
LoggerFactory.getLogger(BaseTableUpsertMetadataManager.class);
- private TableConfig _tableConfig;
- private Schema _schema;
- private TableDataManager _tableDataManager;
+
+ protected TableConfig _tableConfig;
+ protected Schema _schema;
+ protected TableDataManager _tableDataManager;
protected String _tableNameWithType;
protected List<String> _primaryKeyColumns;
protected List<String> _comparisonColumns;
@@ -70,6 +71,9 @@ public abstract class BaseTableUpsertMetadataManager
implements TableUpsertMetad
protected double _metadataTTL;
protected File _tableIndexDir;
protected ServerMetrics _serverMetrics;
+ protected HelixManager _helixManager;
+ protected ExecutorService _segmentPreloadExecutor;
+
private volatile boolean _isPreloading = false;
@Override
@@ -109,6 +113,11 @@ public abstract class BaseTableUpsertMetadataManager
implements TableUpsertMetad
_metadataTTL = upsertConfig.getMetadataTTL();
_tableIndexDir = tableDataManager.getTableDataDir();
_serverMetrics = serverMetrics;
+ _helixManager = helixManager;
+ _segmentPreloadExecutor = segmentPreloadExecutor;
+
+ initCustomVariables();
+
if (_enableSnapshot && segmentPreloadExecutor != null &&
upsertConfig.isEnablePreload()) {
// Preloading the segments with snapshots for fast upsert metadata
recovery.
// Note that there is an implicit waiting logic between the thread doing
the segment preloading here and the
@@ -119,7 +128,7 @@ public abstract class BaseTableUpsertMetadataManager
implements TableUpsertMetad
// happens as the lambda of ConcurrentHashMap.computeIfAbsent() method,
which ensures the waiting logic.
try {
_isPreloading = true;
- preloadSegments(helixManager, segmentPreloadExecutor);
+ preloadSegments();
} catch (Exception e) {
// Even if preloading fails, we should continue to complete the
initialization, so that TableDataManager can be
// created. Once TableDataManager is created, no more segment
preloading would happen, and the normal segment
@@ -136,16 +145,24 @@ public abstract class BaseTableUpsertMetadataManager
implements TableUpsertMetad
}
}
+ /**
+ * Can be overridden to initialize custom variables after other variables
are set but before preload starts. This is
+ * needed because preload will load segments which might require these
custom variables.
+ */
+ protected void initCustomVariables() {
+ }
+
/**
* Get the ideal state and find segments assigned to current instance, then
preload those with validDocIds snapshot.
* Skip those without the snapshots and those whose crc has changed, as they
will be handled by normal Helix state
* transitions, which will proceed after the preloading phase fully
completes.
*/
- private void preloadSegments(HelixManager helixManager, ExecutorService
segmentPreloadExecutor)
+ private void preloadSegments()
throws Exception {
LOGGER.info("Preload segments from table: {} for fast upsert metadata
recovery", _tableNameWithType);
- IdealState idealState = HelixHelper.getTableIdealState(helixManager,
_tableNameWithType);
- ZkHelixPropertyStore<ZNRecord> propertyStore =
helixManager.getHelixPropertyStore();
+ onPreloadStart();
+ IdealState idealState = HelixHelper.getTableIdealState(_helixManager,
_tableNameWithType);
+ ZkHelixPropertyStore<ZNRecord> propertyStore =
_helixManager.getHelixPropertyStore();
String instanceId = getInstanceId();
IndexLoadingConfig indexLoadingConfig = createIndexLoadingConfig();
List<Future<?>> futures = new ArrayList<>();
@@ -156,7 +173,7 @@ public abstract class BaseTableUpsertMetadataManager
implements TableUpsertMetad
LOGGER.info("Skip segment: {} as its ideal state: {} is not ONLINE",
segmentName, state);
continue;
}
- futures.add(segmentPreloadExecutor.submit(() -> {
+ futures.add(_segmentPreloadExecutor.submit(() -> {
try {
preloadSegment(segmentName, indexLoadingConfig, propertyStore);
} catch (Exception e) {
@@ -179,6 +196,18 @@ public abstract class BaseTableUpsertMetadataManager
implements TableUpsertMetad
LOGGER.info("Preloaded segments from table: {} for fast upsert metadata
recovery", _tableNameWithType);
}
+ /**
+ * Can be overridden to perform operations before preload starts.
+ */
+ protected void onPreloadStart() {
+ }
+
+ /**
+ * Can be overridden to perform operations after preload is done.
+ */
+ protected void onPreloadFinish() {
+ }
+
private String getInstanceId() {
InstanceDataManagerConfig instanceDataManagerConfig =
_tableDataManager.getTableDataManagerConfig().getInstanceDataManagerConfig();
@@ -226,12 +255,6 @@ public abstract class BaseTableUpsertMetadataManager
implements TableUpsertMetad
return new File(SegmentDirectoryPaths.findSegmentDirectory(indexDir),
V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME);
}
- /**
- * Can be overridden to perform operations after preload is done.
- */
- protected void onPreloadFinish() {
- }
-
@Override
public boolean isPreloading() {
return _isPreloading;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]