Jackie-Jiang commented on code in PR #16291: URL: https://github.com/apache/pinot/pull/16291#discussion_r2191007035
########## pinot-core/src/main/java/org/apache/pinot/core/data/table/DeterministicConcurrentIndexedTable.java: ########## @@ -52,4 +52,24 @@ protected void upsertWithoutOrderBy(Key key, Record record) { map.pollLastEntry(); // evict the largest key after insertion } } + + /** + * Adds a record with new key or updates a record with existing key. + * NOTE: {@code compute} method of {@code ConcurrentSkipListMap} is not atomic, thus it's not used + */ + @Override + protected void addOrUpdateRecord(Key key, Record newRecord) { + Record existingRecord = _lookupMap.putIfAbsent(key, newRecord); + if (existingRecord == null) { + // if no key was associated + return; + } + while (true) { + Record oldRecord = _lookupMap.get(key); + Record updatedRecord = updateRecord(oldRecord.copy(), newRecord); Review Comment: Some aggregates update the value in-place (e.g. `DistinctCountHLLAggregationFunction`). We need to guarantee the value is aggregated only once. ########## pinot-core/src/main/java/org/apache/pinot/core/data/table/DeterministicConcurrentIndexedTable.java: ########## @@ -52,4 +52,24 @@ protected void upsertWithoutOrderBy(Key key, Record record) { map.pollLastEntry(); // evict the largest key after insertion } } + + /** + * Adds a record with new key or updates a record with existing key. + * NOTE: {@code compute} method of {@code ConcurrentSkipListMap} is not atomic, thus it's not used + */ + @Override + protected void addOrUpdateRecord(Key key, Record newRecord) { + Record existingRecord = _lookupMap.putIfAbsent(key, newRecord); + if (existingRecord == null) { + // if no key was associated + return; + } + while (true) { + Record oldRecord = _lookupMap.get(key); + Record updatedRecord = updateRecord(oldRecord.copy(), newRecord); Review Comment: Also, `updateRecord()` itself is not thread safe. We cannot allow multiple threads calling it concurrently ########## pinot-core/src/main/java/org/apache/pinot/core/data/table/Record.java: ########## @@ -49,6 +49,11 @@ public Record(Object[] values) { _values = values; } + /// deep copy Review Comment: This is not deep copy. It only copies the references -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org