[ 
https://issues.apache.org/jira/browse/BEAM-14187?focusedWorklogId=755353&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-755353
 ]

ASF GitHub Bot logged work on BEAM-14187:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 11/Apr/22 17:55
            Start Date: 11/Apr/22 17:55
    Worklog Time Spent: 10m 
      Work Description: lukecwik commented on code in PR #17201:
URL: https://github.com/apache/beam/pull/17201#discussion_r847592739


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmReaderImpl.java:
##########
@@ -502,30 +513,65 @@ private Optional<SeekableByteChannel> 
initializeForKeyedRead(
 
     inChannel = initializeBloomFilterAndIndexPerShard(inChannel);
 
-    checkState(indexPerShard != null,
+    checkState(
+        indexPerShard != null,
         "indexPerShard must not be null after 
initializeBloomFilterAndIndexPerShard.");
 
-    // If the index has been populated and contains the shard id, we can 
return.
-    if (indexPerShard != null && indexPerShard.containsKey(shardId)) {
-      checkState(bloomFilter != null, "Bloom filter expected to have been 
initialized.");
+    if (indexPerShard.containsKey(shardId)) {
+      // ConcurrentHashMap.containsKey doesn't lock a bin, so it's faster than 
`computeIfAbsent` for
+      // a present key.
       return inChannel;
     }
 
-    Long startOfNextBlock = 
shardOffsetToShardMap.higherKey(shardWithIndex.getBlockOffset());
-    // If this is the last block, then we need to grab the position of the 
Bloom filter
-    // as the upper bound.
-    if (startOfNextBlock == null) {
-      startOfNextBlock = footer.getBloomFilterPosition();
-    }
+    // Using AtomicReference for an object holder. Actually, atomicity is not 
required.
+    AtomicReference<SeekableByteChannel> rawChannelReference =
+        new AtomicReference<>(inChannel.orNull());
+
+    // JDK-8161372 (ConcurrentHashMap.computeIfAbsent locks bin when k 
present) alleviated the
+    // performance loss by not acquiring lock for the first node. But, the fix 
was applied to Java9,
+    // and it still has chance to lock the bin containing the key at the 
second or next nodes. As we
+    // expect `indexPerShard` already has the shardId in most cases, it would 
have a better
+    // performance to check `containsKey` above before invoking 
`computeIfAbsent` here.
+    indexPerShard.computeIfAbsent(
+        shardId,
+        ignored -> {
+          Long startOfNextBlock = 
shardOffsetToShardMap.higherKey(shardWithIndex.getBlockOffset());
+          // If this is the last block, then we need to grab the position of 
the Bloom filter
+          // as the upper bound.
+          if (startOfNextBlock == null) {
+            startOfNextBlock = footer.getBloomFilterPosition();
+          }
+
+          checkState(
+              shardWithIndex.getIndexOffset() < startOfNextBlock,
+              "Expected the index start offset is less than the next block 
start offset. "
+                  + "But, IsmShard is '%s' and the next block offset is %s for 
resourceId '%s'",
+              shardWithIndex,
+              startOfNextBlock,
+              resourceId);
+
+          try {
+            SeekableByteChannel rawChannel = 
openIfNeeded(Optional.of(rawChannelReference.get()));
+            rawChannelReference.set(rawChannel);
+            return readIndexBlockForShard(resourceId, shardWithIndex, 
startOfNextBlock, rawChannel);
+          } catch (IOException e) {
+            // Wrapping with RuntimeException
+            throw new RuntimeException(
+                "failed to read shard index for resourceId: " + resourceId + " 
shardId: " + shardId,

Review Comment:
   ```suggestion
                   "Failed to read shard index for resourceId: " + resourceId + 
" shardId: " + shardId,
   ```





Issue Time Tracking
-------------------

    Worklog Id:     (was: 755353)
    Time Spent: 2h 40m  (was: 2.5h)

> NullPointerException or IllegalStateException at IsmReaderImpl in Dataflow
> --------------------------------------------------------------------------
>
>                 Key: BEAM-14187
>                 URL: https://issues.apache.org/jira/browse/BEAM-14187
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-dataflow
>            Reporter: Minbo Bae
>            Priority: P2
>          Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> h6. Problem
> Dataflow Java batch jobs with large side input intermittently throws 
> {{NullPointerException}} or {{{}IllegalStateException{}}}.
>  * 
> [NullPointerException|https://gist.githubusercontent.com/baeminbo/459e283eadbc7752c9f23616b52d958a/raw/f0480b8eaff590fb3f3ae2ab98ddce7dd3b4a237/npe.png]
>  happens at 
> [IsmReaderImpl.overKeyComponents|https://github.com/apache/beam/blob/v2.37.0/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmReaderImpl.java#L217]:
>  * 
> [IllegalStateException|https://gist.githubusercontent.com/baeminbo/459e283eadbc7752c9f23616b52d958a/raw/f0480b8eaff590fb3f3ae2ab98ddce7dd3b4a237/IllegalStateException.png]
>  happens at [IsmReaderImpl. initializeForKeyedRead 
> |https://github.com/apache/beam/blob/v2.37.0/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmReaderImpl.java#L500].
> (all error logs in the Dataflow job is 
> [here|https://gist.githubusercontent.com/baeminbo/459e283eadbc7752c9f23616b52d958a/raw/f0480b8eaff590fb3f3ae2ab98ddce7dd3b4a237/downloaded-logs-20220327-171955.json].)
> h6. Hypothesis
> The {{initializeForKeyedRead}} is not synchronized. Multiple threads can 
> enter the method so that initialize the index for the same shard and update 
> {{indexPerShard}} without synchronization. And, the {{overKeyComponents}} 
> also accesses {{indexPerShard}} without synchronization. As {{indexPerShard}} 
> is just a {{HashMap}} which is not thread-safe, it can cause 
> {{NullPointerException}} and {{IllegalStateException}} above.
> h6. Suggestion
> I think it can fix this issue if we change the type of {{indexPerShard}} to a 
> thread-safe map (e.g. {{ConcurrentHashMap}}).



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to