baeminbo commented on code in PR #17201:
URL: https://github.com/apache/beam/pull/17201#discussion_r845915573
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmReaderImpl.java:
##########
@@ -370,7 +373,7 @@ boolean bloomFilterMightContain(RandomAccessData keyBytes) {
position(rawChannel, footer.getBloomFilterPosition());
bloomFilter =
ScalableBloomFilterCoder.of().decode(Channels.newInputStream(rawChannel));
- indexPerShard = new HashMap<>();
+ indexPerShard = new ConcurrentHashMap<>();
Review Comment:
`indexPerShard` is updated at
[initializeForKeyedRead](https://github.com/apache/beam/blob/v2.38.0-RC1/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmReaderImpl.java#L562),
which is not synchronized.
#### IllegalStateException
Let's assume that thread T1 and T2 calls `overKeyComponents` and enter
`initializeForKeyedRead` for shardId `K` at the same time. And `indexPerShard`
doesn't have `K`:
1. T1 checks
[`indexPerShard.containsKey(K)`](https://github.com/apache/beam/blob/v2.38.0-RC1/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmReaderImpl.java#L495)
and it returns `false`.
2. T2 checks
[`indexPerShard.containsKey(K)`](https://github.com/apache/beam/blob/v2.38.0-RC1/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmReaderImpl.java#L495)
and it returns `false`.
3. T1 advances quickly and invokes [`indexPerShard.put(K,
...)`](https://github.com/apache/beam/blob/v2.38.0-RC1/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmReaderImpl.java#L562).
Now, `indexPerShard` has `K`.
2. T2 checks [`checkState(indexPerShard.get(K) ==
null)`](https://github.com/apache/beam/blob/v2.38.0-RC1/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmReaderImpl.java#L500-L503).
It throws `IllegalStateException`.
#### NullPointerException
Let's assume that thread T1 and T2 calls `overKeyComponents`. T1 is for
shardId `K1` while T2 is shardId `K2`, and `indexPerShard` has K1:
1. T1 returns from `intializeForKeyedRead` as
[`indexPerShard.containsKey(K1)`](https://github.com/apache/beam/blob/v2.38.0-RC1/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmReaderImpl.java#L495)
is `true`.
2. T1 stops at
[`indexPerShard.get(K1)`](https://github.com/apache/beam/blob/v2.38.0-RC1/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmReaderImpl.java#L210)
at `overKeyComponents`.
3. T2 advances and invokes [`indexPerShard.put(K2,
...)`](https://github.com/apache/beam/blob/v2.38.0-RC1/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmReaderImpl.java#L562)
at `intializeForKeyedRead`.
4. T1 can get `null` from `indexPerShard.get(K1)`, and will throw
`NullPointerException` as it tries to invoke
[`floorKey`](https://github.com/apache/beam/blob/v2.38.0-RC1/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmReaderImpl.java#L210).
My theory on this is that
* T2 is in
[`HashMap.rehash()`](https://hg.openjdk.java.net/jdk8/jdk8/jdk/file/tip/src/share/classes/java/util/Hashtable.java#l389)
where a `newMap` is created and then entries in `oldMap` are copied to
`newMap`.
* T1 accesses `newMap` before copy finishes. So, it sees `null` for `K1`
which exists in `oldMap` but not in `newMap`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]