GIRAPH-915: With BigDataIO some messages can get ignored (majakabiljo via pavanka)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/58576c81 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/58576c81 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/58576c81 Branch: refs/heads/release-1.1 Commit: 58576c81fed9f4eccb03366151c1cf280765d237 Parents: ffdddff Author: Pavan Kumar <[email protected]> Authored: Wed Jun 18 15:38:30 2014 -0700 Committer: Pavan Kumar <[email protected]> Committed: Wed Jun 18 15:38:30 2014 -0700 ---------------------------------------------------------------------- CHANGELOG | 2 ++ .../SendWorkerOneToAllMessagesRequest.java | 2 +- .../org/apache/giraph/edge/ByteArrayEdges.java | 2 +- .../apache/giraph/utils/ByteStructIterator.java | 2 +- .../utils/ByteStructVertexIdIterator.java | 2 +- .../utils/ExtendedByteArrayDataInput.java | 5 ++++ .../apache/giraph/utils/ExtendedDataInput.java | 7 +++++ .../apache/giraph/utils/UnsafeArrayReads.java | 5 ++++ .../org/apache/giraph/utils/VertexIterator.java | 2 +- .../apache/giraph/utils/io/BigDataInput.java | 31 +++++++++++++------- 10 files changed, 44 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/58576c81/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index f35d4ba..659edfd 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 1.1.0 - unreleased + GIRAPH-915: With BigDataIO some messages can get ignored (majakabiljo via pavanka) + GIRAPH-918: GIRAPH-908 has a small bug reg counting entries (pavanka) GIRAPH-842: option to dump histogram of memory usage when heap is low on memory (pavanka) http://git-wip-us.apache.org/repos/asf/giraph/blob/58576c81/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneToAllMessagesRequest.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneToAllMessagesRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneToAllMessagesRequest.java index 8745adb..5f1ed53 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneToAllMessagesRequest.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneToAllMessagesRequest.java @@ -116,7 +116,7 @@ public class SendWorkerOneToAllMessagesRequest<I extends WritableComparable, int idCount = 0; int partitionId = 0; try { - while (reader.available() != 0) { + while (!reader.endOfInput()) { msg.readFields(reader); idCount = reader.readInt(); for (int i = 0; i < idCount; i++) { http://git-wip-us.apache.org/repos/asf/giraph/blob/58576c81/giraph-core/src/main/java/org/apache/giraph/edge/ByteArrayEdges.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/ByteArrayEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/ByteArrayEdges.java index 271e9c5..509546c 100644 --- a/giraph-core/src/main/java/org/apache/giraph/edge/ByteArrayEdges.java +++ b/giraph-core/src/main/java/org/apache/giraph/edge/ByteArrayEdges.java @@ -159,7 +159,7 @@ public class ByteArrayEdges<I extends WritableComparable, E extends Writable> @Override public boolean hasNext() { - return serializedEdges != null && extendedDataInput.available() > 0; + return serializedEdges != null && !extendedDataInput.endOfInput(); } @Override http://git-wip-us.apache.org/repos/asf/giraph/blob/58576c81/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructIterator.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructIterator.java index 322365c..1f1b90e 100644 --- a/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructIterator.java +++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructIterator.java @@ -44,7 +44,7 @@ public abstract class ByteStructIterator<T extends Writable> implements @Override public boolean hasNext() { - return extendedDataInput.available() > 0; + return !extendedDataInput.endOfInput(); } @Override http://git-wip-us.apache.org/repos/asf/giraph/blob/58576c81/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdIterator.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdIterator.java index 3d564cd..3b880a0 100644 --- a/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdIterator.java +++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdIterator.java @@ -54,7 +54,7 @@ public abstract class ByteStructVertexIdIterator<I extends WritableComparable> @Override public boolean hasNext() { - return extendedDataInput.available() > 0; + return !extendedDataInput.endOfInput(); } @Override http://git-wip-us.apache.org/repos/asf/giraph/blob/58576c81/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataInput.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataInput.java b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataInput.java index 3eae25b..56c79c4 100644 --- a/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataInput.java +++ b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataInput.java @@ -57,6 +57,11 @@ public class ExtendedByteArrayDataInput extends ByteArrayInputStream } @Override + public boolean endOfInput() { + return available() == 0; + } + + @Override public void readFully(byte[] b) throws IOException { dataInput.readFully(b); } http://git-wip-us.apache.org/repos/asf/giraph/blob/58576c81/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedDataInput.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedDataInput.java b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedDataInput.java index f1c6809..96096ed 100644 --- a/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedDataInput.java +++ b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedDataInput.java @@ -36,4 +36,11 @@ public interface ExtendedDataInput extends DataInput { * @return Bytes available */ int available(); + + /** + * Check if we read everything from the input + * + * @return True iff we read everything from the input + */ + boolean endOfInput(); } http://git-wip-us.apache.org/repos/asf/giraph/blob/58576c81/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeArrayReads.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeArrayReads.java b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeArrayReads.java index db19fda..1ab8de6 100644 --- a/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeArrayReads.java +++ b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeArrayReads.java @@ -84,6 +84,11 @@ public class UnsafeArrayReads extends UnsafeReads { return (int) (bufLength - pos); } + @Override + public boolean endOfInput() { + return available() == 0; + } + @Override public int getPos() { http://git-wip-us.apache.org/repos/asf/giraph/blob/58576c81/giraph-core/src/main/java/org/apache/giraph/utils/VertexIterator.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/VertexIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIterator.java index dced9bd..dd73b1f 100644 --- a/giraph-core/src/main/java/org/apache/giraph/utils/VertexIterator.java +++ b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIterator.java @@ -77,7 +77,7 @@ public class VertexIterator<I extends WritableComparable, * @return True if the iteration has more elements. */ public boolean hasNext() { - return extendedDataInput.available() > 0; + return !extendedDataInput.endOfInput(); } /** http://git-wip-us.apache.org/repos/asf/giraph/blob/58576c81/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataInput.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataInput.java b/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataInput.java index f73819a..2454a37 100644 --- a/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataInput.java +++ b/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataInput.java @@ -76,7 +76,7 @@ public class BigDataInput implements ExtendedDataInput { * next one if needed. */ private void checkIfShouldMoveToNextDataInput() { - if (currentInput.available() == 0) { + if (currentInput.endOfInput()) { moveToNextDataInput(); } } @@ -168,12 +168,17 @@ public class BigDataInput implements ExtendedDataInput { @Override public int skipBytes(int n) throws IOException { int bytesLeftToSkip = n; - while (bytesLeftToSkip >= currentInput.available()) { - bytesLeftToSkip -= currentInput.available(); - moveToNextDataInput(); + while (bytesLeftToSkip > 0) { + int bytesSkipped = currentInput.skipBytes(bytesLeftToSkip); + bytesLeftToSkip -= bytesSkipped; + if (bytesLeftToSkip > 0) { + moveToNextDataInput(); + if (endOfInput()) { + break; + } + } } - int bytesSkipped = currentInput.skipBytes(bytesLeftToSkip); - return n - bytesLeftToSkip + bytesSkipped; + return n - bytesLeftToSkip; } @Override @@ -187,10 +192,14 @@ public class BigDataInput implements ExtendedDataInput { @Override public int available() { - int available = 0; - for (int i = currentPositionInInputs; i < dataInputs.size(); i++) { - available += dataInputs.get(i).available(); - } - return available; + throw new UnsupportedOperationException("available: " + + "Not supported with BigDataIO because overflow can happen"); + } + + @Override + public boolean endOfInput() { + return currentInput == EMPTY_INPUT || + (dataInputs.get(currentPositionInInputs).endOfInput() && + currentPositionInInputs == dataInputs.size() - 1); } }
