[ https://issues.apache.org/jira/browse/BEAM-14187?focusedWorklogId=754474&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-754474 ]
ASF GitHub Bot logged work on BEAM-14187: ----------------------------------------- Author: ASF GitHub Bot Created on: 08/Apr/22 09:19 Start Date: 08/Apr/22 09:19 Worklog Time Spent: 10m Work Description: baeminbo commented on code in PR #17201: URL: https://github.com/apache/beam/pull/17201#discussion_r845915573 ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmReaderImpl.java: ########## @@ -370,7 +373,7 @@ boolean bloomFilterMightContain(RandomAccessData keyBytes) { position(rawChannel, footer.getBloomFilterPosition()); bloomFilter = ScalableBloomFilterCoder.of().decode(Channels.newInputStream(rawChannel)); - indexPerShard = new HashMap<>(); + indexPerShard = new ConcurrentHashMap<>(); Review Comment: `indexPerShard` is updated at [initializeForKeyedRead](https://github.com/apache/beam/blob/v2.38.0-RC1/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmReaderImpl.java#L562), which is not synchronized. #### IllegalStateException Let's assume that thread T1 and T2 calls `overKeyComponents` and enter `initializeForKeyedRead` for shardId `K` at the same time. And `indexPerShard` doesn't have `K`: 1. T1 checks [`indexPerShard.containsKey(K)`](https://github.com/apache/beam/blob/v2.38.0-RC1/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmReaderImpl.java#L495) and it returns `false`. 2. T2 checks [`indexPerShard.containsKey(K)`](https://github.com/apache/beam/blob/v2.38.0-RC1/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmReaderImpl.java#L495) and it returns `fase`. 3. T1 advances quickly and invokes [`indexPerShard.put(K, ...)`](https://github.com/apache/beam/blob/v2.38.0-RC1/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmReaderImpl.java#L562). Now, `indexPerShard` has `K`. 2. T2 checks [`checkState(indexPerShard.get(K) == null)`](https://github.com/apache/beam/blob/v2.38.0-RC1/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmReaderImpl.java#L500-L503). It throws `IllegalStateException`. #### NullPointerException Let's assume that thread T1 and T2 calls `overKeyComponents`. T1 is for shardId `K1` while T2 is shardId `K2`, and `indexPerShard` has K1: 1. T1 returns from `intializeForKeyedRead` as [`indexPerShard.containsKey(K1)`](https://github.com/apache/beam/blob/v2.38.0-RC1/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmReaderImpl.java#L495) is `true`. 2. T1 stops at [`indexPerShard.get(K1)`](https://github.com/apache/beam/blob/v2.38.0-RC1/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmReaderImpl.java#L210) at `overKeyComponents`. 3. T2 advances and invokes [`indexPerShard.put(K2, ...)`](https://github.com/apache/beam/blob/v2.38.0-RC1/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmReaderImpl.java#L562) at `intializeForKeyedRead`. 4. T1 can get `null` from `indexPerShard.get(K1)`, and will throw `NullPointerException` as it tries to invoke [`floorKey`](https://github.com/apache/beam/blob/v2.38.0-RC1/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmReaderImpl.java#L210). My theory on this is that * T2 is in [`HashMap.rehash()`](https://hg.openjdk.java.net/jdk8/jdk8/jdk/file/tip/src/share/classes/java/util/Hashtable.java#l389) where a `newMap` is created and then entries in `oldMap` are copied to `newMap`. * T1 accesses `newMap` before copy finishes. So, it sees `null` for `K1` which exists in `oldMap` but not in `newMap`. Issue Time Tracking ------------------- Worklog Id: (was: 754474) Time Spent: 1h (was: 50m) > NullPointerException or IllegalStateException at IsmReaderImpl in Dataflow > -------------------------------------------------------------------------- > > Key: BEAM-14187 > URL: https://issues.apache.org/jira/browse/BEAM-14187 > Project: Beam > Issue Type: Bug > Components: runner-dataflow > Reporter: Minbo Bae > Priority: P2 > Time Spent: 1h > Remaining Estimate: 0h > > h6. Problem > Dataflow Java batch jobs with large side input intermittently throws > {{NullPointerException}} or {{{}IllegalStateException{}}}. > * > [NullPointerException|https://gist.githubusercontent.com/baeminbo/459e283eadbc7752c9f23616b52d958a/raw/f0480b8eaff590fb3f3ae2ab98ddce7dd3b4a237/npe.png] > happens at > [IsmReaderImpl.overKeyComponents|https://github.com/apache/beam/blob/v2.37.0/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmReaderImpl.java#L217]: > * > [IllegalStateException|https://gist.githubusercontent.com/baeminbo/459e283eadbc7752c9f23616b52d958a/raw/f0480b8eaff590fb3f3ae2ab98ddce7dd3b4a237/IllegalStateException.png] > happens at [IsmReaderImpl. initializeForKeyedRead > |https://github.com/apache/beam/blob/v2.37.0/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmReaderImpl.java#L500]. > (all error logs in the Dataflow job is > [here|https://gist.githubusercontent.com/baeminbo/459e283eadbc7752c9f23616b52d958a/raw/f0480b8eaff590fb3f3ae2ab98ddce7dd3b4a237/downloaded-logs-20220327-171955.json].) > h6. Hypothesis > The {{initializeForKeyedRead}} is not synchronized. Multiple threads can > enter the method so that initialize the index for the same shard and update > {{indexPerShard}} without synchronization. And, the {{overKeyComponents}} > also accesses {{indexPerShard}} without synchronization. As {{indexPerShard}} > is just a {{HashMap}} which is not thread-safe, it can cause > {{NullPointerException}} and {{IllegalStateException}} above. > h6. Suggestion > I think it can fix this issue if we change the type of {{indexPerShard}} to a > thread-safe map (e.g. {{ConcurrentHashMap}}). -- This message was sent by Atlassian Jira (v8.20.1#820001)