wgtmac commented on code in PR #959:
URL: https://github.com/apache/parquet-mr/pull/959#discussion_r1018707351
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java:
##########
@@ -44,8 +45,15 @@ public class CodecFactory implements CompressionCodecFactory
{
protected static final Map<String, CompressionCodec> CODEC_BY_NAME =
Collections
.synchronizedMap(new HashMap<String, CompressionCodec>());
- private final Map<CompressionCodecName, BytesCompressor> compressors = new
HashMap<CompressionCodecName, BytesCompressor>();
- private final Map<CompressionCodecName, BytesDecompressor> decompressors =
new HashMap<CompressionCodecName, BytesDecompressor>();
+ /*
+ See: https://issues.apache.org/jira/browse/PARQUET-2126
+ The old implementation stored a single global instance of each type of
compressor and decompressor, which
+ broke thread safety. The solution here is to store one instance of each
codec type per-thread.
+ Normally, one would use ThreadLocal<> here, but the release() method needs
to iterate over all codecs
+ ever created, so we have to implement the per-thread management explicitly.
+ */
+ private final Map<Thread, Map<CompressionCodecName, BytesCompressor>>
allCompressors = new ConcurrentHashMap<>();
Review Comment:
CMIIW, the parquet writer will always create a new CodecFactory internally,
so getCompressor does not suffer from any thread-safety issue.
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java:
##########
@@ -44,8 +45,15 @@ public class CodecFactory implements CompressionCodecFactory
{
protected static final Map<String, CompressionCodec> CODEC_BY_NAME =
Collections
.synchronizedMap(new HashMap<String, CompressionCodec>());
- private final Map<CompressionCodecName, BytesCompressor> compressors = new
HashMap<CompressionCodecName, BytesCompressor>();
- private final Map<CompressionCodecName, BytesDecompressor> decompressors =
new HashMap<CompressionCodecName, BytesDecompressor>();
+ /*
+ See: https://issues.apache.org/jira/browse/PARQUET-2126
+ The old implementation stored a single global instance of each type of
compressor and decompressor, which
+ broke thread safety. The solution here is to store one instance of each
codec type per-thread.
+ Normally, one would use ThreadLocal<> here, but the release() method needs
to iterate over all codecs
+ ever created, so we have to implement the per-thread management explicitly.
+ */
+ private final Map<Thread, Map<CompressionCodecName, BytesCompressor>>
allCompressors = new ConcurrentHashMap<>();
+ private final Map<Thread, Map<CompressionCodecName, BytesDecompressor>>
allDecompressors = new ConcurrentHashMap<>();
Review Comment:
It looks like we cannot remove the cached entry from the map when thread
exits. What is worse, the map will get explosion if there are tons of threads
in a long running instance. The `getDecompressor` gets called only on the
column chunk basis. So I expect the frequency of the call would not be very
high. Would a single `ConcurrentHashMap<CompressionCodecName,
BytesDecompressor>()` be sufficient? If we really care about the regression of
introducing ConcurrentHashMap when we are sure no concurrency will happen, we
can add a new thread-unsafe CodecFactory implementation to do the job. Any
thoughts? @theosib-amazon @shangxinli
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]