Repository: spark Updated Branches: refs/heads/master cfb25b27c -> ebbe589d1
[SPARK-21271][SQL] Ensure Unsafe.sizeInBytes is a multiple of 8 ## What changes were proposed in this pull request? This PR ensures that `Unsafe.sizeInBytes` must be a multiple of 8. It it is not satisfied. `Unsafe.hashCode` causes the assertion violation. ## How was this patch tested? Will add test cases Author: Kazuaki Ishizaki <[email protected]> Closes #18503 from kiszk/SPARK-21271. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ebbe589d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ebbe589d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ebbe589d Branch: refs/heads/master Commit: ebbe589d12434bc108672268bee05a7b7e571ee6 Parents: cfb25b2 Author: Kazuaki Ishizaki <[email protected]> Authored: Thu Jul 27 15:27:24 2017 +0800 Committer: Wenchen Fan <[email protected]> Committed: Thu Jul 27 15:27:24 2017 +0800 ---------------------------------------------------------------------- .../expressions/FixedLengthRowBasedKeyValueBatch.java | 6 +++--- .../spark/sql/catalyst/expressions/UnsafeRow.java | 2 ++ .../VariableLengthRowBasedKeyValueBatch.java | 6 +++--- .../spark/sql/execution/UnsafeExternalRowSorter.java | 7 ++++--- .../spark/sql/execution/UnsafeKVExternalSorter.java | 6 +++--- .../streaming/state/HDFSBackedStateStoreProvider.scala | 12 ++++++++++-- 6 files changed, 25 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/ebbe589d/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/FixedLengthRowBasedKeyValueBatch.java ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/FixedLengthRowBasedKeyValueBatch.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/FixedLengthRowBasedKeyValueBatch.java index a88a315..df52f9c 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/FixedLengthRowBasedKeyValueBatch.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/FixedLengthRowBasedKeyValueBatch.java @@ -62,7 +62,7 @@ public final class FixedLengthRowBasedKeyValueBatch extends RowBasedKeyValueBatc keyRowId = numRows; keyRow.pointTo(base, recordOffset, klen); - valueRow.pointTo(base, recordOffset + klen, vlen + 4); + valueRow.pointTo(base, recordOffset + klen, vlen); numRows++; return valueRow; } @@ -95,7 +95,7 @@ public final class FixedLengthRowBasedKeyValueBatch extends RowBasedKeyValueBatc getKeyRow(rowId); } assert(rowId >= 0); - valueRow.pointTo(base, keyRow.getBaseOffset() + klen, vlen + 4); + valueRow.pointTo(base, keyRow.getBaseOffset() + klen, vlen); return valueRow; } @@ -131,7 +131,7 @@ public final class FixedLengthRowBasedKeyValueBatch extends RowBasedKeyValueBatc } key.pointTo(base, offsetInPage, klen); - value.pointTo(base, offsetInPage + klen, vlen + 4); + value.pointTo(base, offsetInPage + klen, vlen); offsetInPage += recordLength; recordsInPage -= 1; http://git-wip-us.apache.org/repos/asf/spark/blob/ebbe589d/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java index 56994fa..ec947d7 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java @@ -167,6 +167,7 @@ public final class UnsafeRow extends InternalRow implements Externalizable, Kryo */ public void pointTo(Object baseObject, long baseOffset, int sizeInBytes) { assert numFields >= 0 : "numFields (" + numFields + ") should >= 0"; + assert sizeInBytes % 8 == 0 : "sizeInBytes (" + sizeInBytes + ") should be a multiple of 8"; this.baseObject = baseObject; this.baseOffset = baseOffset; this.sizeInBytes = sizeInBytes; @@ -183,6 +184,7 @@ public final class UnsafeRow extends InternalRow implements Externalizable, Kryo } public void setTotalSize(int sizeInBytes) { + assert sizeInBytes % 8 == 0 : "sizeInBytes (" + sizeInBytes + ") should be a multiple of 8"; this.sizeInBytes = sizeInBytes; } http://git-wip-us.apache.org/repos/asf/spark/blob/ebbe589d/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/VariableLengthRowBasedKeyValueBatch.java ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/VariableLengthRowBasedKeyValueBatch.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/VariableLengthRowBasedKeyValueBatch.java index ea4f984..905e682 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/VariableLengthRowBasedKeyValueBatch.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/VariableLengthRowBasedKeyValueBatch.java @@ -65,7 +65,7 @@ public final class VariableLengthRowBasedKeyValueBatch extends RowBasedKeyValueB keyRowId = numRows; keyRow.pointTo(base, recordOffset + 8, klen); - valueRow.pointTo(base, recordOffset + 8 + klen, vlen + 4); + valueRow.pointTo(base, recordOffset + 8 + klen, vlen); numRows++; return valueRow; } @@ -102,7 +102,7 @@ public final class VariableLengthRowBasedKeyValueBatch extends RowBasedKeyValueB long offset = keyRow.getBaseOffset(); int klen = keyRow.getSizeInBytes(); int vlen = Platform.getInt(base, offset - 8) - klen - 4; - valueRow.pointTo(base, offset + klen, vlen + 4); + valueRow.pointTo(base, offset + klen, vlen); return valueRow; } @@ -146,7 +146,7 @@ public final class VariableLengthRowBasedKeyValueBatch extends RowBasedKeyValueB currentvlen = totalLength - currentklen; key.pointTo(base, offsetInPage + 8, currentklen); - value.pointTo(base, offsetInPage + 8 + currentklen, currentvlen + 4); + value.pointTo(base, offsetInPage + 8 + currentklen, currentvlen); offsetInPage += 8 + totalLength + 8; recordsInPage -= 1; http://git-wip-us.apache.org/repos/asf/spark/blob/ebbe589d/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java index aadfcaa..53b0886 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java @@ -208,9 +208,10 @@ public final class UnsafeExternalRowSorter { @Override public int compare(Object baseObj1, long baseOff1, Object baseObj2, long baseOff2) { - // TODO: Why are the sizes -1? - row1.pointTo(baseObj1, baseOff1, -1); - row2.pointTo(baseObj2, baseOff2, -1); + // Note that since ordering doesn't need the total length of the record, we just pass 0 + // into the row. + row1.pointTo(baseObj1, baseOff1, 0); + row2.pointTo(baseObj2, baseOff2, 0); return ordering.compare(row1, row2); } } http://git-wip-us.apache.org/repos/asf/spark/blob/ebbe589d/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java index ee5bcfd..d8acf11 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java @@ -238,10 +238,10 @@ public final class UnsafeKVExternalSorter { @Override public int compare(Object baseObj1, long baseOff1, Object baseObj2, long baseOff2) { - // Note that since ordering doesn't need the total length of the record, we just pass -1 + // Note that since ordering doesn't need the total length of the record, we just pass 0 // into the row. - row1.pointTo(baseObj1, baseOff1 + 4, -1); - row2.pointTo(baseObj2, baseOff2 + 4, -1); + row1.pointTo(baseObj1, baseOff1 + 4, 0); + row2.pointTo(baseObj2, baseOff2 + 4, 0); return ordering.compare(row1, row2); } } http://git-wip-us.apache.org/repos/asf/spark/blob/ebbe589d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index fa4c99c..e0c2e94 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -369,7 +369,11 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit val valueRowBuffer = new Array[Byte](valueSize) ByteStreams.readFully(input, valueRowBuffer, 0, valueSize) val valueRow = new UnsafeRow(valueSchema.fields.length) - valueRow.pointTo(valueRowBuffer, valueSize) + // If valueSize in existing file is not multiple of 8, floor it to multiple of 8. + // This is a workaround for the following: + // Prior to Spark 2.3 mistakenly append 4 bytes to the value row in + // `RowBasedKeyValueBatch`, which gets persisted into the checkpoint data + valueRow.pointTo(valueRowBuffer, (valueSize / 8) * 8) map.put(keyRow, valueRow) } } @@ -433,7 +437,11 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit val valueRowBuffer = new Array[Byte](valueSize) ByteStreams.readFully(input, valueRowBuffer, 0, valueSize) val valueRow = new UnsafeRow(valueSchema.fields.length) - valueRow.pointTo(valueRowBuffer, valueSize) + // If valueSize in existing file is not multiple of 8, floor it to multiple of 8. + // This is a workaround for the following: + // Prior to Spark 2.3 mistakenly append 4 bytes to the value row in + // `RowBasedKeyValueBatch`, which gets persisted into the checkpoint data + valueRow.pointTo(valueRowBuffer, (valueSize / 8) * 8) map.put(keyRow, valueRow) } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
