baeminbo commented on code in PR #17201:
URL: https://github.com/apache/beam/pull/17201#discussion_r845993595


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmReaderImpl.java:
##########
@@ -370,7 +373,7 @@ boolean bloomFilterMightContain(RandomAccessData keyBytes) {
     position(rawChannel, footer.getBloomFilterPosition());
     bloomFilter = 
ScalableBloomFilterCoder.of().decode(Channels.newInputStream(rawChannel));
 
-    indexPerShard = new HashMap<>();
+    indexPerShard = new ConcurrentHashMap<>();

Review Comment:
   After writing `IllegalStateException` scenario above, I realized that 
changing the type of `indexPerShard` to a thread-safe Map cannot prevent 
`IllegalStateException` at [`checkState(indexPerShard.get(K) == 
null)`](https://github.com/apache/beam/blob/v2.38.0-RC1/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmReaderImpl.java#L500-L503).`.
   
   I added additional 
[commit](https://github.com/apache/beam/pull/17201/commits/b74fc99bee7e1aa11b38495d53ab0b3147c17b34)
 to remove the check. It's not necessary as the condition is checked at 
if-clause just before it.
   
   In addition, as `initializeForKeyedRead` allows concurrent access, it has a 
benefit to update `indexPerShard` for multiple shardIds, but it can update a 
key multiple times if multiple threads try to update the key at the same time. 
We can fine-control the synchronization, but it may make the code a little bit 
more complex (e.g. using `computeIfAbsent` with `indexPerShard`). Do you think 
we should do more optimization?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to