zhangyue19921010 commented on code in PR #12982:
URL: https://github.com/apache/hudi/pull/12982#discussion_r1999997831


##########
hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java:
##########
@@ -92,7 +92,7 @@ public class ExternalSpillableMap<T extends Serializable, R> 
implements Map<T, R
   public ExternalSpillableMap(long maxInMemorySizeInBytes, String 
baseFilePath, SizeEstimator<T> keySizeEstimator,
                               SizeEstimator<R> valueSizeEstimator, DiskMapType 
diskMapType, CustomSerializer<R> valueSerializer,
                               boolean isCompressionEnabled, String 
loggingContext) throws IOException {
-    this.inMemoryMap = new HashMap<>();
+    this.inMemoryMap = new ConcurrentHashMap<>();

Review Comment:
   We need to carefully consider whether we should make 
the`ExternalSpillableMap` thread-safe, including its put, get, and remove 
functions. Simply replacing it with a `ConcurrentHashMap` might not be 
sufficient. Take put function as example 
   ```
     public R put(T key, R value) {
       if (this.estimatedPayloadSize == 0) {
         // At first, use the sizeEstimate of a record being inserted into the 
spillable map.
         // Note, the converter may over-estimate the size of a record in the 
JVM
         this.estimatedPayloadSize = keySizeEstimator.sizeEstimate(key) + 
valueSizeEstimator.sizeEstimate(value);
       } else if (this.inMemoryMap.size() % 
NUMBER_OF_RECORDS_TO_ESTIMATE_PAYLOAD_SIZE == 0) {
         this.estimatedPayloadSize = (long) (this.estimatedPayloadSize * 0.9 + 
(keySizeEstimator.sizeEstimate(key) + valueSizeEstimator.sizeEstimate(value)) * 
0.1);
         this.currentInMemoryMapSize = this.inMemoryMap.size() * 
this.estimatedPayloadSize;
         if (this.inMemoryMap.size() / 
NUMBER_OF_RECORDS_TO_ESTIMATE_PAYLOAD_SIZE == 1) {
           LOG.info("{} : Updated Estimated Payload size {}", loggingContext, 
this.estimatedPayloadSize);
         }
       }
   
       if (this.inMemoryMap.containsKey(key)) {
         this.inMemoryMap.put(key, value);
       } else if (this.currentInMemoryMapSize < this.maxInMemorySizeInBytes) {
         this.currentInMemoryMapSize += this.estimatedPayloadSize;
         // Remove the old version of the record from disk first to avoid data 
duplication.
         if (inDiskContainsKey(key)) {
           diskBasedMap.remove(key);
         }
         this.inMemoryMap.put(key, value);
       } else {
         if (diskBasedMap == null) {
           initDiskBasedMap();
         }
         diskBasedMap.put(key, value);
       }
       return value;
     }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to