[ https://issues.apache.org/jira/browse/SPARK-18394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Michael Armbrust updated SPARK-18394: ------------------------------------- Target Version/s: 2.3.0 (was: 2.2.0) > Executing the same query twice in a row results in CodeGenerator cache misses > ----------------------------------------------------------------------------- > > Key: SPARK-18394 > URL: https://issues.apache.org/jira/browse/SPARK-18394 > Project: Spark > Issue Type: Bug > Components: SQL > Environment: HiveThriftServer2 running on branch-2.0 on Mac laptop > Reporter: Jonny Serencsa > > Executing the query: > {noformat} > select > l_returnflag, > l_linestatus, > sum(l_quantity) as sum_qty, > sum(l_extendedprice) as sum_base_price, > sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, > sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, > avg(l_quantity) as avg_qty, > avg(l_extendedprice) as avg_price, > avg(l_discount) as avg_disc, > count(*) as count_order > from > lineitem_1_row > where > l_shipdate <= date_sub('1998-12-01', '90') > group by > l_returnflag, > l_linestatus > ; > {noformat} > twice (in succession), will result in CodeGenerator cache misses in BOTH > executions. Since the query is identical, I would expect the same code to be > generated. > Turns out, the generated code is not exactly the same, resulting in cache > misses when performing the lookup in the CodeGenerator cache. Yet, the code > is equivalent. > Below is (some portion of the) generated code for two runs of the query: > run-1 > {noformat} > import java.nio.ByteBuffer; > import java.nio.ByteOrder; > import scala.collection.Iterator; > import org.apache.spark.sql.types.DataType; > import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder; > import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter; > import org.apache.spark.sql.execution.columnar.MutableUnsafeRow; > public SpecificColumnarIterator generate(Object[] references) { > return new SpecificColumnarIterator(); > } > class SpecificColumnarIterator extends > org.apache.spark.sql.execution.columnar.ColumnarIterator { > private ByteOrder nativeOrder = null; > private byte[][] buffers = null; > private UnsafeRow unsafeRow = new UnsafeRow(7); > private BufferHolder bufferHolder = new BufferHolder(unsafeRow); > private UnsafeRowWriter rowWriter = new UnsafeRowWriter(bufferHolder, 7); > private MutableUnsafeRow mutableRow = null; > private int currentRow = 0; > private int numRowsInBatch = 0; > private scala.collection.Iterator input = null; > private DataType[] columnTypes = null; > private int[] columnIndexes = null; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor1; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor2; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor3; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor4; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor5; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor6; > public SpecificColumnarIterator() { > this.nativeOrder = ByteOrder.nativeOrder(); > this.buffers = new byte[7][]; > this.mutableRow = new MutableUnsafeRow(rowWriter); > } > public void initialize(Iterator input, DataType[] columnTypes, int[] > columnIndexes) { > this.input = input; > this.columnTypes = columnTypes; > this.columnIndexes = columnIndexes; > } > public boolean hasNext() { > if (currentRow < numRowsInBatch) { > return true; > } > if (!input.hasNext()) { > return false; > } > org.apache.spark.sql.execution.columnar.CachedBatch batch = > (org.apache.spark.sql.execution.columnar.CachedBatch) input.next(); > currentRow = 0; > numRowsInBatch = batch.numRows(); > for (int i = 0; i < columnIndexes.length; i ++) { > buffers[i] = batch.buffers()[columnIndexes[i]]; > } > accessor = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[0]).order(nativeOrder)); > accessor1 = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[1]).order(nativeOrder)); > accessor2 = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[2]).order(nativeOrder)); > accessor3 = new > org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[3]).order(nativeOrder)); > accessor4 = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[4]).order(nativeOrder)); > accessor5 = new > org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[5]).order(nativeOrder)); > accessor6 = new > org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[6]).order(nativeOrder)); > return hasNext(); > } > public InternalRow next() { > currentRow += 1; > bufferHolder.reset(); > rowWriter.zeroOutNullBytes(); > accessor.extractTo(mutableRow, 0); > accessor1.extractTo(mutableRow, 1); > accessor2.extractTo(mutableRow, 2); > accessor3.extractTo(mutableRow, 3); > accessor4.extractTo(mutableRow, 4); > accessor5.extractTo(mutableRow, 5); > accessor6.extractTo(mutableRow, 6); > unsafeRow.setTotalSize(bufferHolder.totalSize()); > return unsafeRow; > } > } > {noformat} > run-2: > {noformat} > import java.nio.ByteBuffer; > import java.nio.ByteOrder; > import scala.collection.Iterator; > import org.apache.spark.sql.types.DataType; > import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder; > import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter; > import org.apache.spark.sql.execution.columnar.MutableUnsafeRow; > public SpecificColumnarIterator generate(Object[] references) { > return new SpecificColumnarIterator(); > } > class SpecificColumnarIterator extends > org.apache.spark.sql.execution.columnar.ColumnarIterator { > private ByteOrder nativeOrder = null; > private byte[][] buffers = null; > private UnsafeRow unsafeRow = new UnsafeRow(7); > private BufferHolder bufferHolder = new BufferHolder(unsafeRow); > private UnsafeRowWriter rowWriter = new UnsafeRowWriter(bufferHolder, 7); > private MutableUnsafeRow mutableRow = null; > private int currentRow = 0; > private int numRowsInBatch = 0; > private scala.collection.Iterator input = null; > private DataType[] columnTypes = null; > private int[] columnIndexes = null; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor1; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor2; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor3; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor4; > private org.apache.spark.sql.execution.columnar.StringColumnAccessor > accessor5; > private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor > accessor6; > public SpecificColumnarIterator() { > this.nativeOrder = ByteOrder.nativeOrder(); > this.buffers = new byte[7][]; > this.mutableRow = new MutableUnsafeRow(rowWriter); > } > public void initialize(Iterator input, DataType[] columnTypes, int[] > columnIndexes) { > this.input = input; > this.columnTypes = columnTypes; > this.columnIndexes = columnIndexes; > } > public boolean hasNext() { > if (currentRow < numRowsInBatch) { > return true; > } > if (!input.hasNext()) { > return false; > } > org.apache.spark.sql.execution.columnar.CachedBatch batch = > (org.apache.spark.sql.execution.columnar.CachedBatch) input.next(); > currentRow = 0; > numRowsInBatch = batch.numRows(); > for (int i = 0; i < columnIndexes.length; i ++) { > buffers[i] = batch.buffers()[columnIndexes[i]]; > } > accessor = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[0]).order(nativeOrder)); > accessor1 = new > org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[1]).order(nativeOrder)); > accessor2 = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[2]).order(nativeOrder)); > accessor3 = new > org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[3]).order(nativeOrder)); > accessor4 = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[4]).order(nativeOrder)); > accessor5 = new > org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[5]).order(nativeOrder)); > accessor6 = new > org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[6]).order(nativeOrder)); > return hasNext(); > } > public InternalRow next() { > currentRow += 1; > bufferHolder.reset(); > rowWriter.zeroOutNullBytes(); > accessor.extractTo(mutableRow, 0); > accessor1.extractTo(mutableRow, 1); > accessor2.extractTo(mutableRow, 2); > accessor3.extractTo(mutableRow, 3); > accessor4.extractTo(mutableRow, 4); > accessor5.extractTo(mutableRow, 5); > accessor6.extractTo(mutableRow, 6); > unsafeRow.setTotalSize(bufferHolder.totalSize()); > return unsafeRow; > } > } > {noformat} > Diff-ing the two files reveals that the "accessor*" variable definitions are > permuted. -- This message was sent by Atlassian JIRA (v6.3.15#6346) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org