Jackie-Jiang commented on code in PR #16291:
URL: https://github.com/apache/pinot/pull/16291#discussion_r2191007035


##########
pinot-core/src/main/java/org/apache/pinot/core/data/table/DeterministicConcurrentIndexedTable.java:
##########
@@ -52,4 +52,24 @@ protected void upsertWithoutOrderBy(Key key, Record record) {
       map.pollLastEntry(); // evict the largest key after insertion
     }
   }
+
+  /**
+   * Adds a record with new key or updates a record with existing key.
+   * NOTE: {@code compute} method of {@code ConcurrentSkipListMap} is not 
atomic, thus it's not used
+   */
+  @Override
+  protected void addOrUpdateRecord(Key key, Record newRecord) {
+    Record existingRecord = _lookupMap.putIfAbsent(key, newRecord);
+    if (existingRecord == null) {
+      // if no key was associated
+      return;
+    }
+    while (true) {
+      Record oldRecord = _lookupMap.get(key);
+      Record updatedRecord = updateRecord(oldRecord.copy(), newRecord);

Review Comment:
   Some aggregates update the value in-place (e.g. 
`DistinctCountHLLAggregationFunction`). We need to guarantee the value is 
aggregated only once.



##########
pinot-core/src/main/java/org/apache/pinot/core/data/table/DeterministicConcurrentIndexedTable.java:
##########
@@ -52,4 +52,24 @@ protected void upsertWithoutOrderBy(Key key, Record record) {
       map.pollLastEntry(); // evict the largest key after insertion
     }
   }
+
+  /**
+   * Adds a record with new key or updates a record with existing key.
+   * NOTE: {@code compute} method of {@code ConcurrentSkipListMap} is not 
atomic, thus it's not used
+   */
+  @Override
+  protected void addOrUpdateRecord(Key key, Record newRecord) {
+    Record existingRecord = _lookupMap.putIfAbsent(key, newRecord);
+    if (existingRecord == null) {
+      // if no key was associated
+      return;
+    }
+    while (true) {
+      Record oldRecord = _lookupMap.get(key);
+      Record updatedRecord = updateRecord(oldRecord.copy(), newRecord);

Review Comment:
   Also, `updateRecord()` itself is not thread safe. We cannot allow multiple 
threads calling it concurrently



##########
pinot-core/src/main/java/org/apache/pinot/core/data/table/Record.java:
##########
@@ -49,6 +49,11 @@ public Record(Object[] values) {
     _values = values;
   }
 
+  ///  deep copy

Review Comment:
   This is not deep copy. It only copies the references



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to