Repository: incubator-beam
Updated Branches:
  refs/heads/master 2ffecfda2 -> 731c03036


AvroSource: synchronization fix


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/20a58873
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/20a58873
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/20a58873

Branch: refs/heads/master
Commit: 20a5887361416db516001aec984a58f7da7b3e50
Parents: 2ffecfd
Author: Dan Halperin <[email protected]>
Authored: Mon May 23 12:57:26 2016 -0700
Committer: Dan Halperin <[email protected]>
Committed: Tue May 31 10:09:43 2016 -0700

----------------------------------------------------------------------
 .../core/src/main/java/org/apache/beam/sdk/io/AvroSource.java   | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/20a58873/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
index 255199f..9cc0b98 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
@@ -494,7 +494,10 @@ public class AvroSource<T> extends BlockBasedSource<T> {
     // Postcondition: same as above, but for the new current (formerly next) 
block.
     @Override
     public boolean readNextBlock() throws IOException {
-      long startOfNextBlock = currentBlockOffset + currentBlockSizeBytes;
+      long startOfNextBlock;
+      synchronized (progressLock) {
+        startOfNextBlock = currentBlockOffset + currentBlockSizeBytes;
+      }
 
       // Before reading the variable-sized block header, record the current 
number of bytes read.
       long preHeaderCount = countStream.getBytesRead();

Reply via email to