[ 
https://issues.apache.org/jira/browse/BEAM-14187?focusedWorklogId=754474&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-754474
 ]

ASF GitHub Bot logged work on BEAM-14187:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 08/Apr/22 09:19
            Start Date: 08/Apr/22 09:19
    Worklog Time Spent: 10m 
      Work Description: baeminbo commented on code in PR #17201:
URL: https://github.com/apache/beam/pull/17201#discussion_r845915573


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmReaderImpl.java:
##########
@@ -370,7 +373,7 @@ boolean bloomFilterMightContain(RandomAccessData keyBytes) {
     position(rawChannel, footer.getBloomFilterPosition());
     bloomFilter = 
ScalableBloomFilterCoder.of().decode(Channels.newInputStream(rawChannel));
 
-    indexPerShard = new HashMap<>();
+    indexPerShard = new ConcurrentHashMap<>();

Review Comment:
   `indexPerShard` is updated at 
[initializeForKeyedRead](https://github.com/apache/beam/blob/v2.38.0-RC1/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmReaderImpl.java#L562),
 which is not synchronized. 
   
   #### IllegalStateException
   Let's assume that thread T1 and T2 calls `overKeyComponents` and enter 
`initializeForKeyedRead` for shardId `K` at the same time. And `indexPerShard` 
doesn't have `K`:
   
   1. T1 checks 
[`indexPerShard.containsKey(K)`](https://github.com/apache/beam/blob/v2.38.0-RC1/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmReaderImpl.java#L495)
 and it returns `false`. 
   2. T2 checks 
[`indexPerShard.containsKey(K)`](https://github.com/apache/beam/blob/v2.38.0-RC1/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmReaderImpl.java#L495)
 and it returns `fase`.
   3. T1 advances quickly and invokes [`indexPerShard.put(K, 
...)`](https://github.com/apache/beam/blob/v2.38.0-RC1/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmReaderImpl.java#L562).
 Now, `indexPerShard` has `K`.
   2. T2 checks [`checkState(indexPerShard.get(K) == 
null)`](https://github.com/apache/beam/blob/v2.38.0-RC1/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmReaderImpl.java#L500-L503).
 It throws `IllegalStateException`.
   
   #### NullPointerException
   Let's assume that thread T1 and T2 calls `overKeyComponents`. T1 is for 
shardId `K1` while T2 is shardId `K2`, and `indexPerShard` has K1:
   
   1. T1 returns from `intializeForKeyedRead` as 
[`indexPerShard.containsKey(K1)`](https://github.com/apache/beam/blob/v2.38.0-RC1/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmReaderImpl.java#L495)
 is `true`.
   2. T1 stops at 
[`indexPerShard.get(K1)`](https://github.com/apache/beam/blob/v2.38.0-RC1/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmReaderImpl.java#L210)
  at `overKeyComponents`.
   3. T2 advances and invokes [`indexPerShard.put(K2, 
...)`](https://github.com/apache/beam/blob/v2.38.0-RC1/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmReaderImpl.java#L562)
 at `intializeForKeyedRead`.
   4. T1 can get `null` from `indexPerShard.get(K1)`, and will throw 
`NullPointerException` as it tries to invoke 
[`floorKey`](https://github.com/apache/beam/blob/v2.38.0-RC1/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmReaderImpl.java#L210).
   
   My theory on this is that 
   * T2 is in 
[`HashMap.rehash()`](https://hg.openjdk.java.net/jdk8/jdk8/jdk/file/tip/src/share/classes/java/util/Hashtable.java#l389)
 where a `newMap` is created and then entries in `oldMap` are copied to 
`newMap`.
   * T1 accesses `newMap` before copy finishes. So, it sees `null` for `K1` 
which exists in `oldMap` but not in `newMap`.
   





Issue Time Tracking
-------------------

    Worklog Id:     (was: 754474)
    Time Spent: 1h  (was: 50m)

> NullPointerException or IllegalStateException at IsmReaderImpl in Dataflow
> --------------------------------------------------------------------------
>
>                 Key: BEAM-14187
>                 URL: https://issues.apache.org/jira/browse/BEAM-14187
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-dataflow
>            Reporter: Minbo Bae
>            Priority: P2
>          Time Spent: 1h
>  Remaining Estimate: 0h
>
> h6. Problem
> Dataflow Java batch jobs with large side input intermittently throws 
> {{NullPointerException}} or {{{}IllegalStateException{}}}.
>  * 
> [NullPointerException|https://gist.githubusercontent.com/baeminbo/459e283eadbc7752c9f23616b52d958a/raw/f0480b8eaff590fb3f3ae2ab98ddce7dd3b4a237/npe.png]
>  happens at 
> [IsmReaderImpl.overKeyComponents|https://github.com/apache/beam/blob/v2.37.0/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmReaderImpl.java#L217]:
>  * 
> [IllegalStateException|https://gist.githubusercontent.com/baeminbo/459e283eadbc7752c9f23616b52d958a/raw/f0480b8eaff590fb3f3ae2ab98ddce7dd3b4a237/IllegalStateException.png]
>  happens at [IsmReaderImpl. initializeForKeyedRead 
> |https://github.com/apache/beam/blob/v2.37.0/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmReaderImpl.java#L500].
> (all error logs in the Dataflow job is 
> [here|https://gist.githubusercontent.com/baeminbo/459e283eadbc7752c9f23616b52d958a/raw/f0480b8eaff590fb3f3ae2ab98ddce7dd3b4a237/downloaded-logs-20220327-171955.json].)
> h6. Hypothesis
> The {{initializeForKeyedRead}} is not synchronized. Multiple threads can 
> enter the method so that initialize the index for the same shard and update 
> {{indexPerShard}} without synchronization. And, the {{overKeyComponents}} 
> also accesses {{indexPerShard}} without synchronization. As {{indexPerShard}} 
> is just a {{HashMap}} which is not thread-safe, it can cause 
> {{NullPointerException}} and {{IllegalStateException}} above.
> h6. Suggestion
> I think it can fix this issue if we change the type of {{indexPerShard}} to a 
> thread-safe map (e.g. {{ConcurrentHashMap}}).



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to