Repository: incubator-beam Updated Branches: refs/heads/master 2ffecfda2 -> 731c03036
AvroSource: synchronization fix Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/20a58873 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/20a58873 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/20a58873 Branch: refs/heads/master Commit: 20a5887361416db516001aec984a58f7da7b3e50 Parents: 2ffecfd Author: Dan Halperin <[email protected]> Authored: Mon May 23 12:57:26 2016 -0700 Committer: Dan Halperin <[email protected]> Committed: Tue May 31 10:09:43 2016 -0700 ---------------------------------------------------------------------- .../core/src/main/java/org/apache/beam/sdk/io/AvroSource.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/20a58873/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java index 255199f..9cc0b98 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java @@ -494,7 +494,10 @@ public class AvroSource<T> extends BlockBasedSource<T> { // Postcondition: same as above, but for the new current (formerly next) block. @Override public boolean readNextBlock() throws IOException { - long startOfNextBlock = currentBlockOffset + currentBlockSizeBytes; + long startOfNextBlock; + synchronized (progressLock) { + startOfNextBlock = currentBlockOffset + currentBlockSizeBytes; + } // Before reading the variable-sized block header, record the current number of bytes read. long preHeaderCount = countStream.getBytesRead();
