[ https://issues.apache.org/jira/browse/BEAM-14187?focusedWorklogId=755353&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-755353 ]
ASF GitHub Bot logged work on BEAM-14187: ----------------------------------------- Author: ASF GitHub Bot Created on: 11/Apr/22 17:55 Start Date: 11/Apr/22 17:55 Worklog Time Spent: 10m Work Description: lukecwik commented on code in PR #17201: URL: https://github.com/apache/beam/pull/17201#discussion_r847592739 ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmReaderImpl.java: ########## @@ -502,30 +513,65 @@ private Optional<SeekableByteChannel> initializeForKeyedRead( inChannel = initializeBloomFilterAndIndexPerShard(inChannel); - checkState(indexPerShard != null, + checkState( + indexPerShard != null, "indexPerShard must not be null after initializeBloomFilterAndIndexPerShard."); - // If the index has been populated and contains the shard id, we can return. - if (indexPerShard != null && indexPerShard.containsKey(shardId)) { - checkState(bloomFilter != null, "Bloom filter expected to have been initialized."); + if (indexPerShard.containsKey(shardId)) { + // ConcurrentHashMap.containsKey doesn't lock a bin, so it's faster than `computeIfAbsent` for + // a present key. return inChannel; } - Long startOfNextBlock = shardOffsetToShardMap.higherKey(shardWithIndex.getBlockOffset()); - // If this is the last block, then we need to grab the position of the Bloom filter - // as the upper bound. - if (startOfNextBlock == null) { - startOfNextBlock = footer.getBloomFilterPosition(); - } + // Using AtomicReference for an object holder. Actually, atomicity is not required. + AtomicReference<SeekableByteChannel> rawChannelReference = + new AtomicReference<>(inChannel.orNull()); + + // JDK-8161372 (ConcurrentHashMap.computeIfAbsent locks bin when k present) alleviated the + // performance loss by not acquiring lock for the first node. But, the fix was applied to Java9, + // and it still has chance to lock the bin containing the key at the second or next nodes. As we + // expect `indexPerShard` already has the shardId in most cases, it would have a better + // performance to check `containsKey` above before invoking `computeIfAbsent` here. + indexPerShard.computeIfAbsent( + shardId, + ignored -> { + Long startOfNextBlock = shardOffsetToShardMap.higherKey(shardWithIndex.getBlockOffset()); + // If this is the last block, then we need to grab the position of the Bloom filter + // as the upper bound. + if (startOfNextBlock == null) { + startOfNextBlock = footer.getBloomFilterPosition(); + } + + checkState( + shardWithIndex.getIndexOffset() < startOfNextBlock, + "Expected the index start offset is less than the next block start offset. " + + "But, IsmShard is '%s' and the next block offset is %s for resourceId '%s'", + shardWithIndex, + startOfNextBlock, + resourceId); + + try { + SeekableByteChannel rawChannel = openIfNeeded(Optional.of(rawChannelReference.get())); + rawChannelReference.set(rawChannel); + return readIndexBlockForShard(resourceId, shardWithIndex, startOfNextBlock, rawChannel); + } catch (IOException e) { + // Wrapping with RuntimeException + throw new RuntimeException( + "failed to read shard index for resourceId: " + resourceId + " shardId: " + shardId, Review Comment: ```suggestion "Failed to read shard index for resourceId: " + resourceId + " shardId: " + shardId, ``` Issue Time Tracking ------------------- Worklog Id: (was: 755353) Time Spent: 2h 40m (was: 2.5h) > NullPointerException or IllegalStateException at IsmReaderImpl in Dataflow > -------------------------------------------------------------------------- > > Key: BEAM-14187 > URL: https://issues.apache.org/jira/browse/BEAM-14187 > Project: Beam > Issue Type: Bug > Components: runner-dataflow > Reporter: Minbo Bae > Priority: P2 > Time Spent: 2h 40m > Remaining Estimate: 0h > > h6. Problem > Dataflow Java batch jobs with large side input intermittently throws > {{NullPointerException}} or {{{}IllegalStateException{}}}. > * > [NullPointerException|https://gist.githubusercontent.com/baeminbo/459e283eadbc7752c9f23616b52d958a/raw/f0480b8eaff590fb3f3ae2ab98ddce7dd3b4a237/npe.png] > happens at > [IsmReaderImpl.overKeyComponents|https://github.com/apache/beam/blob/v2.37.0/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmReaderImpl.java#L217]: > * > [IllegalStateException|https://gist.githubusercontent.com/baeminbo/459e283eadbc7752c9f23616b52d958a/raw/f0480b8eaff590fb3f3ae2ab98ddce7dd3b4a237/IllegalStateException.png] > happens at [IsmReaderImpl. initializeForKeyedRead > |https://github.com/apache/beam/blob/v2.37.0/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmReaderImpl.java#L500]. > (all error logs in the Dataflow job is > [here|https://gist.githubusercontent.com/baeminbo/459e283eadbc7752c9f23616b52d958a/raw/f0480b8eaff590fb3f3ae2ab98ddce7dd3b4a237/downloaded-logs-20220327-171955.json].) > h6. Hypothesis > The {{initializeForKeyedRead}} is not synchronized. Multiple threads can > enter the method so that initialize the index for the same shard and update > {{indexPerShard}} without synchronization. And, the {{overKeyComponents}} > also accesses {{indexPerShard}} without synchronization. As {{indexPerShard}} > is just a {{HashMap}} which is not thread-safe, it can cause > {{NullPointerException}} and {{IllegalStateException}} above. > h6. Suggestion > I think it can fix this issue if we change the type of {{indexPerShard}} to a > thread-safe map (e.g. {{ConcurrentHashMap}}). -- This message was sent by Atlassian Jira (v8.20.1#820001)