baeminbo commented on code in PR #17201:
URL: https://github.com/apache/beam/pull/17201#discussion_r845993595
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmReaderImpl.java:
##########
@@ -370,7 +373,7 @@ boolean bloomFilterMightContain(RandomAccessData keyBytes) {
position(rawChannel, footer.getBloomFilterPosition());
bloomFilter =
ScalableBloomFilterCoder.of().decode(Channels.newInputStream(rawChannel));
- indexPerShard = new HashMap<>();
+ indexPerShard = new ConcurrentHashMap<>();
Review Comment:
After writing `IllegalStateException` scenario above, I realized that
changing the type of `indexPerShard` to a thread-safe Map cannot prevent
`IllegalStateException` at [`checkState(indexPerShard.get(K) ==
null)`](https://github.com/apache/beam/blob/v2.38.0-RC1/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmReaderImpl.java#L500-L503).`.
I added additional
[commit](https://github.com/apache/beam/pull/17201/commits/b74fc99bee7e1aa11b38495d53ab0b3147c17b34)
to remove the check. It's not necessary as the condition is checked at
if-clause just before it.
In addition, as `initializeForKeyedRead` allows concurrent access, it has a
benefit to update `indexPerShard` for multiple shardIds, but it can update a
key multiple times if multiple threads try to update the key at the same time.
We can fine-control the synchronization, but it may make the code a little bit
more complex (e.g. using `computeIfAbsent` with `indexPerShard`). Do you think
we should do more optimization?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]