zhangyue19921010 commented on code in PR #12982:
URL: https://github.com/apache/hudi/pull/12982#discussion_r1999997831
##########
hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java:
##########
@@ -92,7 +92,7 @@ public class ExternalSpillableMap<T extends Serializable, R>
implements Map<T, R
public ExternalSpillableMap(long maxInMemorySizeInBytes, String
baseFilePath, SizeEstimator<T> keySizeEstimator,
SizeEstimator<R> valueSizeEstimator, DiskMapType
diskMapType, CustomSerializer<R> valueSerializer,
boolean isCompressionEnabled, String
loggingContext) throws IOException {
- this.inMemoryMap = new HashMap<>();
+ this.inMemoryMap = new ConcurrentHashMap<>();
Review Comment:
We need to carefully consider whether we should make
the`ExternalSpillableMap` thread-safe, including its put, get, and remove
functions. Simply replacing it with a `ConcurrentHashMap` might not be
sufficient. Take put function as example
```
public R put(T key, R value) {
if (this.estimatedPayloadSize == 0) {
// At first, use the sizeEstimate of a record being inserted into the
spillable map.
// Note, the converter may over-estimate the size of a record in the
JVM
this.estimatedPayloadSize = keySizeEstimator.sizeEstimate(key) +
valueSizeEstimator.sizeEstimate(value);
} else if (this.inMemoryMap.size() %
NUMBER_OF_RECORDS_TO_ESTIMATE_PAYLOAD_SIZE == 0) {
this.estimatedPayloadSize = (long) (this.estimatedPayloadSize * 0.9 +
(keySizeEstimator.sizeEstimate(key) + valueSizeEstimator.sizeEstimate(value)) *
0.1);
this.currentInMemoryMapSize = this.inMemoryMap.size() *
this.estimatedPayloadSize;
if (this.inMemoryMap.size() /
NUMBER_OF_RECORDS_TO_ESTIMATE_PAYLOAD_SIZE == 1) {
LOG.info("{} : Updated Estimated Payload size {}", loggingContext,
this.estimatedPayloadSize);
}
}
if (this.inMemoryMap.containsKey(key)) {
this.inMemoryMap.put(key, value);
} else if (this.currentInMemoryMapSize < this.maxInMemorySizeInBytes) {
this.currentInMemoryMapSize += this.estimatedPayloadSize;
// Remove the old version of the record from disk first to avoid data
duplication.
if (inDiskContainsKey(key)) {
diskBasedMap.remove(key);
}
this.inMemoryMap.put(key, value);
} else {
if (diskBasedMap == null) {
initDiskBasedMap();
}
diskBasedMap.put(key, value);
}
return value;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]