Copilot commented on code in PR #17503:
URL: https://github.com/apache/pinot/pull/17503#discussion_r2922296659
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##########
@@ -192,8 +213,62 @@ protected void removeSegment(IndexSegment segment,
Iterator<PrimaryKey> primaryK
_primaryKeyToRecordLocationMap.computeIfPresent(HashUtils.hashPrimaryKey(primaryKey,
_hashFunction),
(pk, recordLocation) -> {
if (recordLocation.getSegment() == segment) {
+ if (_context.isTableTypeInconsistentDuringConsumption() &&
segment instanceof MutableSegment) {
+ _previousKeyToRecordLocationMap.remove(pk);
+ }
+ return null;
+ }
+ return recordLocation;
+ });
+ }
+ }
+
+ @Override
+ protected void revertAndRemoveSegment(IndexSegment segment,
+ Iterator<Map.Entry<Integer, PrimaryKey>> primaryKeyIterator) {
+ while (primaryKeyIterator.hasNext()) {
+ Map.Entry<Integer, PrimaryKey> primaryKeyEntry =
primaryKeyIterator.next();
+ PrimaryKey primaryKey = primaryKeyEntry.getValue();
+ int docId = primaryKeyEntry.getKey();
+
_primaryKeyToRecordLocationMap.computeIfPresent(HashUtils.hashPrimaryKey(primaryKey,
_hashFunction),
+ (pk, recordLocation) -> {
+ RecordLocation prevLocation =
_previousKeyToRecordLocationMap.get(primaryKey);
+ if (prevLocation == null) {
+ _previousKeyToRecordLocationMap.remove(primaryKey);
return null;
Review Comment:
`revertAndRemoveSegment()` looks up/removes entries in
`_previousKeyToRecordLocationMap` using the un-hashed `PrimaryKey` object
(`get(primaryKey)` / `remove(primaryKey)`), but this map is keyed by the hashed
primary-key object used in `_primaryKeyToRecordLocationMap.compute(...)`. As a
result, `prevLocation` will almost always be null and the code will incorrectly
remove primary keys from the upsert metadata instead of reverting, and will
also leak entries in `_previousKeyToRecordLocationMap`. Use the same hashed key
(`pk`) for get/remove operations.
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -1795,15 +1795,13 @@ public static void checkForDuplicates(List<String>
columns) {
* @param tableConfig the table config to check, may be null
* @return true if the table has inconsistent state configs, false if
tableConfig is null or no issues found
*/
- public static boolean checkForInconsistentStateConfigs(@Nullable TableConfig
tableConfig) {
+ public static boolean isTableTypeInconsistentDuringConsumption(@Nullable
TableConfig tableConfig) {
UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
if (upsertConfig == null) {
return false;
}
- return tableConfig.getReplication() > 1 && (
- upsertConfig.getMode() == UpsertConfig.Mode.PARTIAL
- || (upsertConfig.isDropOutOfOrderRecord()
- && upsertConfig.getConsistencyMode() ==
UpsertConfig.ConsistencyMode.NONE));
+ return (upsertConfig.getMode() == UpsertConfig.Mode.PARTIAL ||
upsertConfig.isDropOutOfOrderRecord()
+ || upsertConfig.getOutOfOrderRecordColumn() != null);
Review Comment:
`isTableTypeInconsistentDuringConsumption(@Nullable TableConfig
tableConfig)` dereferences `tableConfig` without a null check
(`tableConfig.getUpsertConfig()`), but the method contract/Javadoc says the
argument may be null. This can cause an immediate NPE in callers that pass null
table configs. Add an explicit `if (tableConfig == null) { return false; }`
guard (or tighten the signature/Javadoc).
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java:
##########
@@ -271,32 +322,87 @@ public void replaceSegment(ImmutableSegment segment,
@Nullable ThreadSafeMutable
}
@Override
- protected void removeNewlyAddedKeys(IndexSegment oldSegment) {
- int removedKeys = 0;
- for (Map.Entry<Object, RecordLocation> entry : _newlyAddedKeys.entrySet())
{
- if (entry.getValue().getSegment() == oldSegment) {
- _primaryKeyToRecordLocationMap.remove(entry.getKey());
- removeDocId(oldSegment, entry.getValue().getDocId());
- removedKeys++;
- }
+ protected void removeSegment(IndexSegment segment, Iterator<PrimaryKey>
primaryKeyIterator) {
+ // We need to decrease the distinctSegmentCount for each unique primary
key in this deleting segment by 1
+ // as the occurrence of the key in this segment is being removed. We are
taking a set of unique primary keys
+ // to avoid double counting the same key in the same segment.
+ Set<Object> uniquePrimaryKeys = new HashSet<>();
+ while (primaryKeyIterator.hasNext()) {
+ PrimaryKey primaryKey = primaryKeyIterator.next();
+
_primaryKeyToRecordLocationMap.computeIfPresent(HashUtils.hashPrimaryKey(primaryKey,
_hashFunction),
+ (pk, recordLocation) -> {
+ if (recordLocation.getSegment() == segment) {
+ if (_context.isTableTypeInconsistentDuringConsumption() &&
segment instanceof MutableSegment) {
+ _previousKeyToRecordLocationMap.remove(pk);
+ }
+ return null;
+ }
+ if (!uniquePrimaryKeys.add(pk)) {
+ return recordLocation;
+ }
+ return new RecordLocation(recordLocation.getSegment(),
recordLocation.getDocId(),
+ recordLocation.getComparisonValue(),
+
RecordLocation.decrementSegmentCount(recordLocation.getDistinctSegmentCount()));
+ });
}
- _logger.info("Removed newly added {} keys for the segment: {} out of :
{}", removedKeys,
- oldSegment.getSegmentName(), _previousKeyToRecordLocationMap.size());
}
@Override
- protected void removeSegment(IndexSegment segment, Iterator<PrimaryKey>
primaryKeyIterator) {
+ protected void revertAndRemoveSegment(IndexSegment segment,
+ Iterator<Map.Entry<Integer, PrimaryKey>> primaryKeyIterator) {
// We need to decrease the distinctSegmentCount for each unique primary
key in this deleting segment by 1
// as the occurrence of the key in this segment is being removed. We are
taking a set of unique primary keys
// to avoid double counting the same key in the same segment.
Set<Object> uniquePrimaryKeys = new HashSet<>();
while (primaryKeyIterator.hasNext()) {
- PrimaryKey primaryKey = primaryKeyIterator.next();
+ Map.Entry<Integer, PrimaryKey> primaryKeyEntry =
primaryKeyIterator.next();
+ PrimaryKey primaryKey = primaryKeyEntry.getValue();
+ int docId = primaryKeyEntry.getKey();
_primaryKeyToRecordLocationMap.computeIfPresent(HashUtils.hashPrimaryKey(primaryKey,
_hashFunction),
(pk, recordLocation) -> {
- if (recordLocation.getSegment() == segment) {
+ RecordLocation prevLocation =
_previousKeyToRecordLocationMap.get(primaryKey);
+ if (prevLocation == null) {
+ _previousKeyToRecordLocationMap.remove(primaryKey);
return null;
Review Comment:
`revertAndRemoveSegment()` accesses `_previousKeyToRecordLocationMap` with
the `PrimaryKey` instance (`get(primaryKey)` / `remove(primaryKey)`), but the
map is populated with the hashed key object from
`_primaryKeyToRecordLocationMap.compute(...)`. This key-type mismatch means
`prevLocation` will typically be null and the code will drop keys instead of
reverting metadata (and can leave stale entries in
`_previousKeyToRecordLocationMap`). Use the hashed key (`pk`) consistently for
lookups/removals.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -2773,21 +2774,24 @@ public PauseState resumeTopicsConsumption(String
tableNameWithType, List<Integer
* Validates that force commit is allowed for the given table.
* Throws IllegalStateException if force commit is disabled for
partial-upsert tables
* or upsert tables with dropOutOfOrder enabled when replication > 1.
+ * Force commit is always allowed for tables without inconsistent state
configs.
*/
private void validateForceCommitAllowed(String tableNameWithType) {
TableConfig tableConfig =
_helixResourceManager.getTableConfig(tableNameWithType);
if (tableConfig == null) {
throw new IllegalStateException("Table config not found for table: " +
tableNameWithType);
}
- UpsertInconsistentStateConfig configInstance =
UpsertInconsistentStateConfig.getInstance();
- if (!configInstance.isForceCommitReloadAllowed(tableConfig)) {
- throw new IllegalStateException(
- "Force commit disabled for table: " + tableNameWithType
- + ". Table is configured as partial upsert or
dropOutOfOrderRecord=true with replication > 1, "
- + "which can cause data inconsistency during force commit. "
- + "Current cluster config '" + configInstance.getConfigKey() +
"' is set to: "
- + configInstance.isForceCommitReloadEnabled()
- + ". To enable force commit, set this config to 'true'.");
+ // Only restrict force commit for tables with inconsistent state configs
+ // (partial upsert or dropOutOfOrder tables with replication > 1)
+ boolean isInconsistentMetadataDuringConsumption =
+ TableConfigUtils.isTableTypeInconsistentDuringConsumption(tableConfig);
+ ConsumingSegmentConsistencyModeListener configInstance =
ConsumingSegmentConsistencyModeListener.getInstance();
+ if (!configInstance.isForceCommitAllowed() &&
isInconsistentMetadataDuringConsumption) {
+ throw new IllegalStateException("Force commit disabled for table: " +
tableNameWithType
+ + ". Table is configured as partial upsert or
dropOutOfOrderRecord=true with replication > 1, "
+ + "which can cause data inconsistency during force commit. " +
"Current cluster config '"
+ + configInstance.getConfigKey() + "' is set to: " +
configInstance.getConsistencyMode()
+ + ". To enable safer force commit, set
isTableTypeInconsistentDuringConsumption config to 'PROTECTED'.");
Review Comment:
The IllegalStateException message suggests setting
"isTableTypeInconsistentDuringConsumption config" to `PROTECTED`, but
`isTableTypeInconsistentDuringConsumption` is a method, not a cluster config.
This makes the guidance misleading; it should instruct users to set the cluster
config key (`pinot.server.consuming.segment.consistency.mode`, i.e.
`configInstance.getConfigKey()`) to `PROTECTED`.
```suggestion
+ ". To enable safer force commit, set cluster config '" +
configInstance.getConfigKey()
+ "' to 'PROTECTED'.");
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]