[ 
https://issues.apache.org/jira/browse/PARQUET-2126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17631496#comment-17631496
 ] 

ASF GitHub Bot commented on PARQUET-2126:
-----------------------------------------

wgtmac commented on code in PR #959:
URL: https://github.com/apache/parquet-mr/pull/959#discussion_r1018707351


##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java:
##########
@@ -44,8 +45,15 @@ public class CodecFactory implements CompressionCodecFactory 
{
   protected static final Map<String, CompressionCodec> CODEC_BY_NAME = 
Collections
       .synchronizedMap(new HashMap<String, CompressionCodec>());
 
-  private final Map<CompressionCodecName, BytesCompressor> compressors = new 
HashMap<CompressionCodecName, BytesCompressor>();
-  private final Map<CompressionCodecName, BytesDecompressor> decompressors = 
new HashMap<CompressionCodecName, BytesDecompressor>();
+  /*
+  See: https://issues.apache.org/jira/browse/PARQUET-2126
+  The old implementation stored a single global instance of each type of 
compressor and decompressor, which
+  broke thread safety. The solution here is to store one instance of each 
codec type per-thread.
+  Normally, one would use ThreadLocal<> here, but the release() method needs 
to iterate over all codecs
+  ever created, so we have to implement the per-thread management explicitly.
+   */
+  private final Map<Thread, Map<CompressionCodecName, BytesCompressor>> 
allCompressors = new ConcurrentHashMap<>();

Review Comment:
   CMIIW, the parquet writer will always create a new CodecFactory internally, 
so getCompressor does not suffer from any thread-safety issue.



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java:
##########
@@ -44,8 +45,15 @@ public class CodecFactory implements CompressionCodecFactory 
{
   protected static final Map<String, CompressionCodec> CODEC_BY_NAME = 
Collections
       .synchronizedMap(new HashMap<String, CompressionCodec>());
 
-  private final Map<CompressionCodecName, BytesCompressor> compressors = new 
HashMap<CompressionCodecName, BytesCompressor>();
-  private final Map<CompressionCodecName, BytesDecompressor> decompressors = 
new HashMap<CompressionCodecName, BytesDecompressor>();
+  /*
+  See: https://issues.apache.org/jira/browse/PARQUET-2126
+  The old implementation stored a single global instance of each type of 
compressor and decompressor, which
+  broke thread safety. The solution here is to store one instance of each 
codec type per-thread.
+  Normally, one would use ThreadLocal<> here, but the release() method needs 
to iterate over all codecs
+  ever created, so we have to implement the per-thread management explicitly.
+   */
+  private final Map<Thread, Map<CompressionCodecName, BytesCompressor>> 
allCompressors = new ConcurrentHashMap<>();
+  private final Map<Thread, Map<CompressionCodecName, BytesDecompressor>> 
allDecompressors = new ConcurrentHashMap<>();

Review Comment:
   It looks like we cannot remove the cached entry from the map when thread 
exits. What is worse, the map will get explosion if there are tons of threads 
in a long running instance. The `getDecompressor` gets called only on the 
column chunk basis. So I expect the frequency of the call would not be very 
high. Would a single `ConcurrentHashMap<CompressionCodecName, 
BytesDecompressor>()` be sufficient? If we really care about the regression of 
introducing ConcurrentHashMap when we are sure no concurrency will happen, we 
can add a new thread-unsafe CodecFactory implementation to do the job. Any 
thoughts? @theosib-amazon @shangxinli 





> Thread safety bug in CodecFactory
> ---------------------------------
>
>                 Key: PARQUET-2126
>                 URL: https://issues.apache.org/jira/browse/PARQUET-2126
>             Project: Parquet
>          Issue Type: Bug
>          Components: parquet-mr
>    Affects Versions: 1.12.2
>            Reporter: James Turton
>            Priority: Major
>
> The code for returning Compressor objects to the caller goes to some lengths 
> to achieve thread safety, including keeping Codec objects in an Apache 
> Commons pool that has thread-safe borrow semantics.  This is all undone by 
> the BytesCompressor and BytesDecompressor Maps in 
> org.apache.parquet.hadoop.CodecFactory which end up caching single compressor 
> and decompressor instances due to code in CodecFactory@getCompressor and 
> CodecFactory@getDecompressor.  When the caller runs multiple threads, those 
> threads end up sharing compressor and decompressor instances.
> For compressors based on Xerial Snappy this bug has no effect because that 
> library is itself thread safe.  But when BuiltInGzipCompressor from Hadoop is 
> selected for the CompressionCodecName.GZIP case, serious problems ensue.  
> That class is not thread safe and sharing one instance of it between threads 
> produces both silent data corruption and JVM crashes.
> To fix this situation, parquet-mr should stop caching single compressor and 
> decompressor instances.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to