Repository: spark
Updated Branches:
refs/heads/branch-1.6 3cc3d8578 -> f12f11e57
[SPARK-14138] [SQL] Fix generated SpecificColumnarIterator code can exceed JVM
size limit for cached DataFrames
## What changes were proposed in this pull request?
This PR reduces Java byte code size of method in ```SpecificColumnarIterator```
by using two approaches:
1. Generate and call ```getTYPEColumnAccessor()``` for each type, which is
actually used, for instantiating accessors
2. Group a lot of method calls (more than 4000) into a method
## How was this patch tested?
Added a new unit test to ```InMemoryColumnarQuerySuite```
Here is generate code
```java
/* 033 */ private org.apache.spark.sql.execution.columnar.CachedBatch batch =
null;
/* 034 */
/* 035 */ private org.apache.spark.sql.execution.columnar.IntColumnAccessor
accessor;
/* 036 */ private org.apache.spark.sql.execution.columnar.IntColumnAccessor
accessor1;
/* 037 */
/* 038 */ public SpecificColumnarIterator() {
/* 039 */ this.nativeOrder = ByteOrder.nativeOrder();
/* 030 */ this.mutableRow = new MutableUnsafeRow(rowWriter);
/* 041 */ }
/* 042 */
/* 043 */ public void initialize(Iterator input, DataType[] columnTypes,
int[] columnIndexes,
/* 044 */ boolean columnNullables[]) {
/* 044 */ this.input = input;
/* 046 */ this.columnTypes = columnTypes;
/* 047 */ this.columnIndexes = columnIndexes;
/* 048 */ }
/* 049 */
/* 050 */
/* 051 */ private org.apache.spark.sql.execution.columnar.IntColumnAccessor
getIntColumnAccessor(int idx) {
/* 052 */ byte[] buffer = batch.buffers()[columnIndexes[idx]];
/* 053 */ return new
org.apache.spark.sql.execution.columnar.IntColumnAccessor(ByteBuffer.wrap(buffer).order(nativeOrder));
/* 054 */ }
/* 055 */
/* 056 */
/* 057 */
/* 058 */
/* 059 */
/* 060 */
/* 061 */ public boolean hasNext() {
/* 062 */ if (currentRow < numRowsInBatch) {
/* 063 */ return true;
/* 064 */ }
/* 065 */ if (!input.hasNext()) {
/* 066 */ return false;
/* 067 */ }
/* 068 */
/* 069 */ batch = (org.apache.spark.sql.execution.columnar.CachedBatch)
input.next();
/* 070 */ currentRow = 0;
/* 071 */ numRowsInBatch = batch.numRows();
/* 072 */ accessor = getIntColumnAccessor(0);
/* 073 */ accessor1 = getIntColumnAccessor(1);
/* 074 */
/* 075 */ return hasNext();
/* 076 */ }
/* 077 */
/* 078 */ public InternalRow next() {
/* 079 */ currentRow += 1;
/* 080 */ bufferHolder.reset();
/* 081 */ rowWriter.zeroOutNullBytes();
/* 082 */ accessor.extractTo(mutableRow, 0);
/* 083 */ accessor1.extractTo(mutableRow, 1);
/* 084 */ unsafeRow.setTotalSize(bufferHolder.totalSize());
/* 085 */ return unsafeRow;
/* 086 */ }
```
(If this patch involves UI changes, please attach a screenshot; otherwise,
remove this)
Author: Kazuaki Ishizaki <[email protected]>
Closes #11984 from kiszk/SPARK-14138.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f12f11e5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f12f11e5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f12f11e5
Branch: refs/heads/branch-1.6
Commit: f12f11e578169b47e3f8b18b299948c0670ba585
Parents: 3cc3d85
Author: Kazuaki Ishizaki <[email protected]>
Authored: Thu Mar 31 15:05:48 2016 -0700
Committer: Davies Liu <[email protected]>
Committed: Thu Mar 31 15:05:48 2016 -0700
----------------------------------------------------------------------
.../columnar/GenerateColumnAccessor.scala | 50 +++++++++++++++++---
.../columnar/InMemoryColumnarQuerySuite.scala | 10 ++++
2 files changed, 54 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/f12f11e5/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
index eaafc96..4d01b78 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution.columnar
+import scala.collection.mutable
+
import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
@@ -88,7 +90,7 @@ object GenerateColumnAccessor extends
CodeGenerator[Seq[DataType], ColumnarItera
case array: ArrayType => classOf[ArrayColumnAccessor].getName
case t: MapType => classOf[MapColumnAccessor].getName
}
- ctx.addMutableState(accessorCls, accessorName, s"$accessorName = null;")
+ ctx.addMutableState(accessorCls, accessorName, "")
val createCode = dt match {
case t if ctx.isPrimitiveType(dt) =>
@@ -97,7 +99,7 @@ object GenerateColumnAccessor extends
CodeGenerator[Seq[DataType], ColumnarItera
s"$accessorName = new
$accessorCls(ByteBuffer.wrap(buffers[$index]).order(nativeOrder));"
case other =>
s"""$accessorName = new
$accessorCls(ByteBuffer.wrap(buffers[$index]).order(nativeOrder),
- (${dt.getClass.getName}) columnTypes[$index]);"""
+ (${dt.getClass.getName}) columnTypes[$index]);"""
}
val extract = s"$accessorName.extractTo(mutableRow, $index);"
@@ -114,6 +116,42 @@ object GenerateColumnAccessor extends
CodeGenerator[Seq[DataType], ColumnarItera
(createCode, extract + patch)
}.unzip
+ /*
+ * 200 = 6000 bytes / 30 (up to 30 bytes per one call))
+ * the maximum byte code size to be compiled for HotSpot is 8000.
+ * We should keep less than 8000
+ */
+ val numberOfStatementsThreshold = 200
+ val (initializerAccessorCalls, extractorCalls) =
+ if (initializeAccessors.length <= numberOfStatementsThreshold) {
+ (initializeAccessors.mkString("\n"), extractors.mkString("\n"))
+ } else {
+ val groupedAccessorsItr =
initializeAccessors.grouped(numberOfStatementsThreshold)
+ val groupedExtractorsItr =
extractors.grouped(numberOfStatementsThreshold)
+ var groupedAccessorsLength = 0
+ groupedAccessorsItr.zipWithIndex.map { case (body, i) =>
+ groupedAccessorsLength += 1
+ val funcName = s"accessors$i"
+ val funcCode = s"""
+ |private void $funcName() {
+ | ${body.mkString("\n")}
+ |}
+ """.stripMargin
+ ctx.addNewFunction(funcName, funcCode)
+ }
+ groupedExtractorsItr.zipWithIndex.map { case (body, i) =>
+ val funcName = s"extractors$i"
+ val funcCode = s"""
+ |private void $funcName() {
+ | ${body.mkString("\n")}
+ |}
+ """.stripMargin
+ ctx.addNewFunction(funcName, funcCode)
+ }
+ ((0 to groupedAccessorsLength - 1).map { i => s"accessors$i();"
}.mkString("\n"),
+ (0 to groupedAccessorsLength - 1).map { i => s"extractors$i();"
}.mkString("\n"))
+ }
+
val code = s"""
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
@@ -149,8 +187,6 @@ object GenerateColumnAccessor extends
CodeGenerator[Seq[DataType], ColumnarItera
this.nativeOrder = ByteOrder.nativeOrder();
this.buffers = new byte[${columnTypes.length}][];
this.mutableRow = new MutableUnsafeRow(rowWriter);
-
- ${initMutableStates(ctx)}
}
public void initialize(Iterator input, DataType[] columnTypes, int[]
columnIndexes) {
@@ -159,6 +195,8 @@ object GenerateColumnAccessor extends
CodeGenerator[Seq[DataType], ColumnarItera
this.columnIndexes = columnIndexes;
}
+ ${declareAddedFunctions(ctx)}
+
public boolean hasNext() {
if (currentRow < numRowsInBatch) {
return true;
@@ -173,7 +211,7 @@ object GenerateColumnAccessor extends
CodeGenerator[Seq[DataType], ColumnarItera
for (int i = 0; i < columnIndexes.length; i ++) {
buffers[i] = batch.buffers()[columnIndexes[i]];
}
- ${initializeAccessors.mkString("\n")}
+ ${initializerAccessorCalls}
return hasNext();
}
@@ -182,7 +220,7 @@ object GenerateColumnAccessor extends
CodeGenerator[Seq[DataType], ColumnarItera
currentRow += 1;
bufferHolder.reset();
rowWriter.initialize(bufferHolder, $numFields);
- ${extractors.mkString("\n")}
+ ${extractorCalls}
unsafeRow.pointTo(bufferHolder.buffer, $numFields,
bufferHolder.totalSize());
return unsafeRow;
}
http://git-wip-us.apache.org/repos/asf/spark/blob/f12f11e5/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
index 25afed2..557415b 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
@@ -219,4 +219,14 @@ class InMemoryColumnarQuerySuite extends QueryTest with
SharedSQLContext {
assert(data.count() === 10)
assert(data.filter($"s" === "3").count() === 1)
}
+
+ test("SPARK-14138: Generated SpecificColumnarIterator can exceed JVM size
limit for cached DF") {
+ val length1 = 3999
+ val columnTypes1 = List.fill(length1)(IntegerType)
+ val columnarIterator1 = GenerateColumnAccessor.generate(columnTypes1)
+
+ val length2 = 10000
+ val columnTypes2 = List.fill(length2)(IntegerType)
+ val columnarIterator2 = GenerateColumnAccessor.generate(columnTypes2)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]