[
https://issues.apache.org/jira/browse/SPARK-15822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15331371#comment-15331371
]
Pete Robbins commented on SPARK-15822:
--------------------------------------
The generated code is:
{code}
Top Arrival Carrier Cancellations:
Found 5 WholeStageCodegen subtrees.
== Subtree 1 / 5 ==
*HashAggregate(key=[Origin#16,UniqueCarrier#8], functions=[partial_count(1)],
output=[Origin#16,UniqueCarrier#8,count#296L])
+- *Project [UniqueCarrier#8, Origin#16]
+- *Filter (((((((isnotnull(Origin#16) && isnotnull(UniqueCarrier#8)) &&
isnotnull(Cancelled#21)) && isnotnull(CancellationCode#22)) && NOT
(Cancelled#21 = 0)) && (CancellationCode#22 = A)) && isnotnull(Dest#17)) &&
(Dest#17 = ORD))
+- *Scan csv
[UniqueCarrier#8,Origin#16,Dest#17,Cancelled#21,CancellationCode#22] Format:
CSV, InputPaths: file:/home/robbins/brandberry/2008.csv, PushedFilters:
[IsNotNull(Origin), IsNotNull(UniqueCarrier), IsNotNull(Cancelled),
IsNotNull(CancellationCode), ..., ReadSchema:
struct<UniqueCarrier:string,Origin:string,Dest:string,Cancelled:int,CancellationCode:string>
Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIterator extends
org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */ private Object[] references;
/* 007 */ private boolean agg_initAgg;
/* 008 */ private boolean agg_bufIsNull;
/* 009 */ private long agg_bufValue;
/* 010 */ private agg_VectorizedHashMap agg_vectorizedHashMap;
/* 011 */ private
java.util.Iterator<org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row>
agg_vectorizedHashMapIter;
/* 012 */ private org.apache.spark.sql.execution.aggregate.HashAggregateExec
agg_plan;
/* 013 */ private
org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap agg_hashMap;
/* 014 */ private org.apache.spark.sql.execution.UnsafeKVExternalSorter
agg_sorter;
/* 015 */ private org.apache.spark.unsafe.KVIterator agg_mapIter;
/* 016 */ private org.apache.spark.sql.execution.metric.SQLMetric
agg_peakMemory;
/* 017 */ private org.apache.spark.sql.execution.metric.SQLMetric
agg_spillSize;
/* 018 */ private org.apache.spark.sql.execution.metric.SQLMetric
scan_numOutputRows;
/* 019 */ private scala.collection.Iterator scan_input;
/* 020 */ private org.apache.spark.sql.execution.metric.SQLMetric
filter_numOutputRows;
/* 021 */ private UnsafeRow filter_result;
/* 022 */ private
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder filter_holder;
/* 023 */ private
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
filter_rowWriter;
/* 024 */ private UnsafeRow project_result;
/* 025 */ private
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder project_holder;
/* 026 */ private
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
project_rowWriter;
/* 027 */ private UnsafeRow agg_result2;
/* 028 */ private
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder;
/* 029 */ private
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter;
/* 030 */ private
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowJoiner
agg_unsafeRowJoiner;
/* 031 */ private org.apache.spark.sql.execution.metric.SQLMetric
wholestagecodegen_numOutputRows;
/* 032 */ private org.apache.spark.sql.execution.metric.SQLMetric
wholestagecodegen_aggTime;
/* 033 */ private UnsafeRow wholestagecodegen_result;
/* 034 */ private
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder
wholestagecodegen_holder;
/* 035 */ private
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
wholestagecodegen_rowWriter;
/* 036 */
/* 037 */ public GeneratedIterator(Object[] references) {
/* 038 */ this.references = references;
/* 039 */ }
/* 040 */
/* 041 */ public void init(int index, scala.collection.Iterator inputs[]) {
/* 042 */ partitionIndex = index;
/* 043 */ agg_initAgg = false;
/* 044 */
/* 045 */ agg_vectorizedHashMap = new agg_VectorizedHashMap();
/* 046 */
/* 047 */ this.agg_plan =
(org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[0];
/* 048 */
/* 049 */ this.agg_peakMemory =
(org.apache.spark.sql.execution.metric.SQLMetric) references[1];
/* 050 */ this.agg_spillSize =
(org.apache.spark.sql.execution.metric.SQLMetric) references[2];
/* 051 */ this.scan_numOutputRows =
(org.apache.spark.sql.execution.metric.SQLMetric) references[3];
/* 052 */ scan_input = inputs[0];
/* 053 */ this.filter_numOutputRows =
(org.apache.spark.sql.execution.metric.SQLMetric) references[4];
/* 054 */ filter_result = new UnsafeRow(5);
/* 055 */ this.filter_holder = new
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(filter_result,
128);
/* 056 */ this.filter_rowWriter = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(filter_holder,
5);
/* 057 */ project_result = new UnsafeRow(2);
/* 058 */ this.project_holder = new
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(project_result,
64);
/* 059 */ this.project_rowWriter = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(project_holder,
2);
/* 060 */ agg_result2 = new UnsafeRow(2);
/* 061 */ this.agg_holder = new
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result2, 64);
/* 062 */ this.agg_rowWriter = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder,
2);
/* 063 */ agg_unsafeRowJoiner = agg_plan.createUnsafeJoiner();
/* 064 */ this.wholestagecodegen_numOutputRows =
(org.apache.spark.sql.execution.metric.SQLMetric) references[7];
/* 065 */ this.wholestagecodegen_aggTime =
(org.apache.spark.sql.execution.metric.SQLMetric) references[8];
/* 066 */ wholestagecodegen_result = new UnsafeRow(3);
/* 067 */ this.wholestagecodegen_holder = new
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(wholestagecodegen_result,
64);
/* 068 */ this.wholestagecodegen_rowWriter = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(wholestagecodegen_holder,
3);
/* 069 */ }
/* 070 */
/* 071 */ public class agg_VectorizedHashMap {
/* 072 */ private org.apache.spark.sql.execution.vectorized.ColumnarBatch
batch;
/* 073 */ private org.apache.spark.sql.execution.vectorized.ColumnarBatch
aggregateBufferBatch;
/* 074 */ private int[] buckets;
/* 075 */ private int capacity = 1 << 16;
/* 076 */ private double loadFactor = 0.5;
/* 077 */ private int numBuckets = (int) (capacity / loadFactor);
/* 078 */ private int maxSteps = 2;
/* 079 */ private int numRows = 0;
/* 080 */ private org.apache.spark.sql.types.StructType schema = new
org.apache.spark.sql.types.StructType().add("Origin",
org.apache.spark.sql.types.DataTypes.StringType)
/* 081 */ .add("UniqueCarrier",
org.apache.spark.sql.types.DataTypes.StringType)
/* 082 */ .add("count", org.apache.spark.sql.types.DataTypes.LongType);
/* 083 */ private org.apache.spark.sql.types.StructType
aggregateBufferSchema =
/* 084 */ new org.apache.spark.sql.types.StructType().add("count",
org.apache.spark.sql.types.DataTypes.LongType);
/* 085 */
/* 086 */ public agg_VectorizedHashMap() {
/* 087 */ batch =
org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(schema,
/* 088 */ org.apache.spark.memory.MemoryMode.ON_HEAP, capacity);
/* 089 */ // TODO: Possibly generate this projection in HashAggregate
directly
/* 090 */ aggregateBufferBatch =
org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(
/* 091 */ aggregateBufferSchema,
org.apache.spark.memory.MemoryMode.ON_HEAP, capacity);
/* 092 */ for (int i = 0 ; i < aggregateBufferBatch.numCols(); i++) {
/* 093 */ aggregateBufferBatch.setColumn(i, batch.column(i+2));
/* 094 */ }
/* 095 */
/* 096 */ buckets = new int[numBuckets];
/* 097 */ java.util.Arrays.fill(buckets, -1);
/* 098 */ }
/* 099 */
/* 100 */ public
org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row
findOrInsert(UTF8String agg_key, UTF8String agg_key1) {
/* 101 */ long h = hash(agg_key, agg_key1);
/* 102 */ int step = 0;
/* 103 */ int idx = (int) h & (numBuckets - 1);
/* 104 */ while (step < maxSteps) {
/* 105 */ // Return bucket index if it's either an empty slot or
already contains the key
/* 106 */ if (buckets[idx] == -1) {
/* 107 */ if (numRows < capacity) {
/* 108 */ // Initialize aggregate keys
/* 109 */ batch.column(0).putByteArray(numRows, agg_key.getBytes());
/* 110 */ batch.column(1).putByteArray(numRows,
agg_key1.getBytes());
/* 111 */
/* 112 */ agg_bufIsNull = false;
/* 113 */ agg_bufValue = 0L;
/* 114 */
/* 115 */ // Initialize aggregate values
/* 116 */
/* 117 */ if (!agg_bufIsNull) {
/* 118 */ batch.column(2).putLong(numRows, agg_bufValue);
/* 119 */ } else {
/* 120 */ batch.column(2).putNull(numRows);
/* 121 */ }
/* 122 */
/* 123 */ buckets[idx] = numRows++;
/* 124 */ batch.setNumRows(numRows);
/* 125 */ aggregateBufferBatch.setNumRows(numRows);
/* 126 */ return aggregateBufferBatch.getRow(buckets[idx]);
/* 127 */ } else {
/* 128 */ // No more space
/* 129 */ return null;
/* 130 */ }
/* 131 */ } else if (equals(idx, agg_key, agg_key1)) {
/* 132 */ return aggregateBufferBatch.getRow(buckets[idx]);
/* 133 */ }
/* 134 */ idx = (idx + 1) & (numBuckets - 1);
/* 135 */ step++;
/* 136 */ }
/* 137 */ // Didn't find it
/* 138 */ return null;
/* 139 */ }
/* 140 */
/* 141 */ private boolean equals(int idx, UTF8String agg_key, UTF8String
agg_key1) {
/* 142 */ return
(batch.column(0).getUTF8String(buckets[idx]).equals(agg_key)) &&
(batch.column(1).getUTF8String(buckets[idx]).equals(agg_key1));
/* 143 */ }
/* 144 */
/* 145 */ private long hash(UTF8String agg_key, UTF8String agg_key1) {
/* 146 */ long agg_hash = 0;
/* 147 */
/* 148 */ int agg_result = 0;
/* 149 */ for (int i = 0; i < agg_key.getBytes().length; i++) {
/* 150 */ int agg_hash1 = agg_key.getBytes()[i];
/* 151 */ agg_result = (agg_result ^ (0x9e3779b9)) + agg_hash1 +
(agg_result << 6) + (agg_result >>> 2);
/* 152 */ }
/* 153 */
/* 154 */ agg_hash = (agg_hash ^ (0x9e3779b9)) + agg_result + (agg_hash
<< 6) + (agg_hash >>> 2);
/* 155 */
/* 156 */ int agg_result1 = 0;
/* 157 */ for (int i = 0; i < agg_key1.getBytes().length; i++) {
/* 158 */ int agg_hash2 = agg_key1.getBytes()[i];
/* 159 */ agg_result1 = (agg_result1 ^ (0x9e3779b9)) + agg_hash2 +
(agg_result1 << 6) + (agg_result1 >>> 2);
/* 160 */ }
/* 161 */
/* 162 */ agg_hash = (agg_hash ^ (0x9e3779b9)) + agg_result1 + (agg_hash
<< 6) + (agg_hash >>> 2);
/* 163 */
/* 164 */ return agg_hash;
/* 165 */ }
/* 166 */
/* 167 */ public
java.util.Iterator<org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row>
/* 168 */ rowIterator() {
/* 169 */ return batch.rowIterator();
/* 170 */ }
/* 171 */
/* 172 */ public void close() {
/* 173 */ batch.close();
/* 174 */ }
/* 175 */
/* 176 */ }
/* 177 */
/* 178 */ private void agg_doAggregateWithKeys() throws java.io.IOException {
/* 179 */ agg_hashMap = agg_plan.createHashMap();
/* 180 */
/* 181 */ while (scan_input.hasNext()) {
/* 182 */ InternalRow scan_row = (InternalRow) scan_input.next();
/* 183 */ scan_numOutputRows.add(1);
/* 184 */ boolean scan_isNull8 = scan_row.isNullAt(3);
/* 185 */ int scan_value8 = scan_isNull8 ? -1 : (scan_row.getInt(3));
/* 186 */
/* 187 */ if (!(!(scan_isNull8))) continue;
/* 188 */
/* 189 */ boolean filter_value3 = false;
/* 190 */ filter_value3 = scan_value8 == 0;
/* 191 */ boolean filter_value2 = false;
/* 192 */ filter_value2 = !(filter_value3);
/* 193 */ if (!filter_value2) continue;
/* 194 */ boolean scan_isNull9 = scan_row.isNullAt(4);
/* 195 */ UTF8String scan_value9 = scan_isNull9 ? null :
(scan_row.getUTF8String(4));
/* 196 */
/* 197 */ if (!(!(scan_isNull9))) continue;
/* 198 */
/* 199 */ Object filter_obj = ((Expression) references[5]).eval(null);
/* 200 */ UTF8String filter_value10 = (UTF8String) filter_obj;
/* 201 */ boolean filter_value8 = false;
/* 202 */ filter_value8 = scan_value9.equals(filter_value10);
/* 203 */ if (!filter_value8) continue;
/* 204 */ boolean scan_isNull7 = scan_row.isNullAt(2);
/* 205 */ UTF8String scan_value7 = scan_isNull7 ? null :
(scan_row.getUTF8String(2));
/* 206 */
/* 207 */ if (!(!(scan_isNull7))) continue;
/* 208 */
/* 209 */ Object filter_obj1 = ((Expression) references[6]).eval(null);
/* 210 */ UTF8String filter_value15 = (UTF8String) filter_obj1;
/* 211 */ boolean filter_value13 = false;
/* 212 */ filter_value13 = scan_value7.equals(filter_value15);
/* 213 */ if (!filter_value13) continue;
/* 214 */
/* 215 */ boolean scan_isNull6 = scan_row.isNullAt(1);
/* 216 */ UTF8String scan_value6 = scan_isNull6 ? null :
(scan_row.getUTF8String(1));
/* 217 */
/* 218 */ if (!(!(scan_isNull6))) continue;
/* 219 */
/* 220 */ boolean scan_isNull5 = scan_row.isNullAt(0);
/* 221 */ UTF8String scan_value5 = scan_isNull5 ? null :
(scan_row.getUTF8String(0));
/* 222 */
/* 223 */ if (!(!(scan_isNull5))) continue;
/* 224 */
/* 225 */ filter_numOutputRows.add(1);
/* 226 */
/* 227 */ UnsafeRow agg_unsafeRowAggBuffer = null;
/* 228 */ org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row
agg_vectorizedAggBuffer = null;
/* 229 */
/* 230 */ if (true) {
/* 231 */ if (!false && !false) {
/* 232 */ agg_vectorizedAggBuffer =
agg_vectorizedHashMap.findOrInsert(
/* 233 */ scan_value6, scan_value5);
/* 234 */ }
/* 235 */ }
/* 236 */
/* 237 */ if (agg_vectorizedAggBuffer == null) {
/* 238 */ // generate grouping key
/* 239 */ agg_holder.reset();
/* 240 */
/* 241 */ agg_rowWriter.write(0, scan_value6);
/* 242 */
/* 243 */ agg_rowWriter.write(1, scan_value5);
/* 244 */ agg_result2.setTotalSize(agg_holder.totalSize());
/* 245 */ int agg_value6 = 42;
/* 246 */
/* 247 */ if (!false) {
/* 248 */ agg_value6 =
org.apache.spark.unsafe.hash.Murmur3_x86_32.hashUnsafeBytes(scan_value6.getBaseObject(),
scan_value6.getBaseOffset(), scan_value6.numBytes(), agg_value6);
/* 249 */ }
/* 250 */
/* 251 */ if (!false) {
/* 252 */ agg_value6 =
org.apache.spark.unsafe.hash.Murmur3_x86_32.hashUnsafeBytes(scan_value5.getBaseObject(),
scan_value5.getBaseOffset(), scan_value5.numBytes(), agg_value6);
/* 253 */ }
/* 254 */ if (true) {
/* 255 */ // try to get the buffer from hash map
/* 256 */ agg_unsafeRowAggBuffer =
/* 257 */ agg_hashMap.getAggregationBufferFromUnsafeRow(agg_result2,
agg_value6);
/* 258 */ }
/* 259 */ if (agg_unsafeRowAggBuffer == null) {
/* 260 */ if (agg_sorter == null) {
/* 261 */ agg_sorter =
agg_hashMap.destructAndCreateExternalSorter();
/* 262 */ } else {
/* 263 */
agg_sorter.merge(agg_hashMap.destructAndCreateExternalSorter());
/* 264 */ }
/* 265 */
/* 266 */ // the hash map had be spilled, it should have enough
memory now,
/* 267 */ // try to allocate buffer again.
/* 268 */ agg_unsafeRowAggBuffer =
/* 269 */ agg_hashMap.getAggregationBufferFromUnsafeRow(agg_result2,
agg_value6);
/* 270 */ if (agg_unsafeRowAggBuffer == null) {
/* 271 */ // failed to allocate the first page
/* 272 */ throw new OutOfMemoryError("No enough memory for
aggregation");
/* 273 */ }
/* 274 */ }
/* 275 */ }
/* 276 */
/* 277 */ if (agg_vectorizedAggBuffer != null) {
/* 278 */ // update vectorized row
/* 279 */
/* 280 */ // common sub-expressions
/* 281 */
/* 282 */ // evaluate aggregate function
/* 283 */ long agg_value10 = agg_vectorizedAggBuffer.getLong(0);
/* 284 */
/* 285 */ long agg_value9 = -1L;
/* 286 */ agg_value9 = agg_value10 + 1L;
/* 287 */ // update vectorized row
/* 288 */ agg_vectorizedAggBuffer.setLong(0, agg_value9);
/* 289 */
/* 290 */ } else {
/* 291 */ // update unsafe row
/* 292 */
/* 293 */ // common sub-expressions
/* 294 */
/* 295 */ // evaluate aggregate function
/* 296 */ long agg_value13 = agg_unsafeRowAggBuffer.getLong(0);
/* 297 */
/* 298 */ long agg_value12 = -1L;
/* 299 */ agg_value12 = agg_value13 + 1L;
/* 300 */ // update unsafe row buffer
/* 301 */ agg_unsafeRowAggBuffer.setLong(0, agg_value12);
/* 302 */
/* 303 */ }
/* 304 */ if (shouldStop()) return;
/* 305 */ }
/* 306 */
/* 307 */ agg_vectorizedHashMapIter = agg_vectorizedHashMap.rowIterator();
/* 308 */
/* 309 */ agg_mapIter = agg_plan.finishAggregate(agg_hashMap, agg_sorter,
agg_peakMemory, agg_spillSize);
/* 310 */ }
/* 311 */
/* 312 */ protected void processNext() throws java.io.IOException {
/* 313 */ if (!agg_initAgg) {
/* 314 */ agg_initAgg = true;
/* 315 */ long wholestagecodegen_beforeAgg = System.nanoTime();
/* 316 */ agg_doAggregateWithKeys();
/* 317 */ wholestagecodegen_aggTime.add((System.nanoTime() -
wholestagecodegen_beforeAgg) / 1000000);
/* 318 */ }
/* 319 */
/* 320 */ // output the result
/* 321 */
/* 322 */ while (agg_vectorizedHashMapIter.hasNext()) {
/* 323 */ wholestagecodegen_numOutputRows.add(1);
/* 324 */ org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row
wholestagecodegen_vectorizedHashMapRow =
/* 325 */ (org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row)
/* 326 */ agg_vectorizedHashMapIter.next();
/* 327 */
/* 328 */ wholestagecodegen_holder.reset();
/* 329 */
/* 330 */ wholestagecodegen_rowWriter.zeroOutNullBytes();
/* 331 */
/* 332 */ boolean wholestagecodegen_isNull =
wholestagecodegen_vectorizedHashMapRow.isNullAt(0);
/* 333 */ UTF8String wholestagecodegen_value = wholestagecodegen_isNull ?
null : (wholestagecodegen_vectorizedHashMapRow.getUTF8String(0));
/* 334 */ if (wholestagecodegen_isNull) {
/* 335 */ wholestagecodegen_rowWriter.setNullAt(0);
/* 336 */ } else {
/* 337 */ wholestagecodegen_rowWriter.write(0, wholestagecodegen_value);
/* 338 */ }
/* 339 */
/* 340 */ boolean wholestagecodegen_isNull1 =
wholestagecodegen_vectorizedHashMapRow.isNullAt(1);
/* 341 */ UTF8String wholestagecodegen_value1 = wholestagecodegen_isNull1
? null : (wholestagecodegen_vectorizedHashMapRow.getUTF8String(1));
/* 342 */ if (wholestagecodegen_isNull1) {
/* 343 */ wholestagecodegen_rowWriter.setNullAt(1);
/* 344 */ } else {
/* 345 */ wholestagecodegen_rowWriter.write(1,
wholestagecodegen_value1);
/* 346 */ }
/* 347 */
/* 348 */ long wholestagecodegen_value2 =
wholestagecodegen_vectorizedHashMapRow.getLong(2);
/* 349 */ wholestagecodegen_rowWriter.write(2, wholestagecodegen_value2);
/* 350 */
wholestagecodegen_result.setTotalSize(wholestagecodegen_holder.totalSize());
/* 351 */
/* 352 */ append(wholestagecodegen_result);
/* 353 */
/* 354 */ if (shouldStop()) return;
/* 355 */ }
/* 356 */
/* 357 */ agg_vectorizedHashMap.close();
/* 358 */
/* 359 */ while (agg_mapIter.next()) {
/* 360 */ wholestagecodegen_numOutputRows.add(1);
/* 361 */ UnsafeRow agg_aggKey = (UnsafeRow) agg_mapIter.getKey();
/* 362 */ UnsafeRow agg_aggBuffer = (UnsafeRow) agg_mapIter.getValue();
/* 363 */
/* 364 */ UnsafeRow agg_resultRow = agg_unsafeRowJoiner.join(agg_aggKey,
agg_aggBuffer);
/* 365 */
/* 366 */ append(agg_resultRow);
/* 367 */
/* 368 */ if (shouldStop()) return;
/* 369 */ }
/* 370 */
/* 371 */ agg_mapIter.close();
/* 372 */ if (agg_sorter == null) {
/* 373 */ agg_hashMap.free();
/* 374 */ }
/* 375 */ }
/* 376 */ }
== Subtree 2 / 5 ==
*Project [Origin#16, UniqueCarrier#8, round((cast((count#134L * 100) as double)
/ cast(total#97L as double)), 2) AS rank#173]
+- *SortMergeJoin [Origin#16, UniqueCarrier#8], [Origin#155,
UniqueCarrier#147], Inner
:- *Sort [Origin#16 ASC, UniqueCarrier#8 ASC], false, 0
: +- *HashAggregate(key=[Origin#16,UniqueCarrier#8], functions=[count(1)],
output=[Origin#16,UniqueCarrier#8,count#134L])
: +- Exchange hashpartitioning(Origin#16, UniqueCarrier#8, 200)
: +- *HashAggregate(key=[Origin#16,UniqueCarrier#8],
functions=[partial_count(1)], output=[Origin#16,UniqueCarrier#8,count#296L])
: +- *Project [UniqueCarrier#8, Origin#16]
: +- *Filter (((((((isnotnull(Origin#16) &&
isnotnull(UniqueCarrier#8)) && isnotnull(Cancelled#21)) &&
isnotnull(CancellationCode#22)) && NOT (Cancelled#21 = 0)) &&
(CancellationCode#22 = A)) && isnotnull(Dest#17)) && (Dest#17 = ORD))
: +- *Scan csv
[UniqueCarrier#8,Origin#16,Dest#17,Cancelled#21,CancellationCode#22] Format:
CSV, InputPaths: file:/home/robbins/brandberry/2008.csv, PushedFilters:
[IsNotNull(Origin), IsNotNull(UniqueCarrier), IsNotNull(Cancelled),
IsNotNull(CancellationCode), ..., ReadSchema:
struct<UniqueCarrier:string,Origin:string,Dest:string,Cancelled:int,CancellationCode:string>
+- *Sort [Origin#155 ASC, UniqueCarrier#147 ASC], false, 0
+- *HashAggregate(key=[Origin#155,UniqueCarrier#147],
functions=[count(1)], output=[Origin#155,UniqueCarrier#147,total#97L])
+- Exchange hashpartitioning(Origin#155, UniqueCarrier#147, 200)
+- *HashAggregate(key=[Origin#155,UniqueCarrier#147],
functions=[partial_count(1)], output=[Origin#155,UniqueCarrier#147,count#303L])
+- *Project [UniqueCarrier#147, Origin#155]
+- *Filter (((isnotnull(UniqueCarrier#147) &&
isnotnull(Origin#155)) && isnotnull(Dest#156)) && (Dest#156 = ORD))
+- *Scan csv [UniqueCarrier#147,Origin#155,Dest#156]
Format: CSV, InputPaths: file:/home/robbins/brandberry/2008.csv, PushedFilters:
[IsNotNull(UniqueCarrier), IsNotNull(Origin), IsNotNull(Dest),
EqualTo(Dest,ORD)], ReadSchema:
struct<UniqueCarrier:string,Origin:string,Dest:string>
Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIterator extends
org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */ private Object[] references;
/* 007 */ private scala.collection.Iterator smj_leftInput;
/* 008 */ private scala.collection.Iterator smj_rightInput;
/* 009 */ private InternalRow smj_leftRow;
/* 010 */ private InternalRow smj_rightRow;
/* 011 */ private UTF8String smj_value4;
/* 012 */ private UTF8String smj_value5;
/* 013 */ private java.util.ArrayList smj_matches;
/* 014 */ private UTF8String smj_value6;
/* 015 */ private UTF8String smj_value7;
/* 016 */ private UTF8String smj_value8;
/* 017 */ private boolean smj_isNull4;
/* 018 */ private UTF8String smj_value9;
/* 019 */ private boolean smj_isNull5;
/* 020 */ private long smj_value10;
/* 021 */ private org.apache.spark.sql.execution.metric.SQLMetric
smj_numOutputRows;
/* 022 */ private UnsafeRow smj_result;
/* 023 */ private
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder smj_holder;
/* 024 */ private
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter smj_rowWriter;
/* 025 */ private UnsafeRow project_result;
/* 026 */ private
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder project_holder;
/* 027 */ private
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
project_rowWriter;
/* 028 */
/* 029 */ public GeneratedIterator(Object[] references) {
/* 030 */ this.references = references;
/* 031 */ }
/* 032 */
/* 033 */ public void init(int index, scala.collection.Iterator inputs[]) {
/* 034 */ partitionIndex = index;
/* 035 */ smj_leftInput = inputs[0];
/* 036 */ smj_rightInput = inputs[1];
/* 037 */
/* 038 */ smj_rightRow = null;
/* 039 */
/* 040 */ smj_matches = new java.util.ArrayList();
/* 041 */
/* 042 */ this.smj_numOutputRows =
(org.apache.spark.sql.execution.metric.SQLMetric) references[0];
/* 043 */ smj_result = new UnsafeRow(6);
/* 044 */ this.smj_holder = new
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(smj_result, 128);
/* 045 */ this.smj_rowWriter = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(smj_holder,
6);
/* 046 */ project_result = new UnsafeRow(3);
/* 047 */ this.project_holder = new
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(project_result,
64);
/* 048 */ this.project_rowWriter = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(project_holder,
3);
/* 049 */ }
/* 050 */
/* 051 */ private boolean findNextInnerJoinRows(
/* 052 */ scala.collection.Iterator leftIter,
/* 053 */ scala.collection.Iterator rightIter) {
/* 054 */ smj_leftRow = null;
/* 055 */ int comp = 0;
/* 056 */ while (smj_leftRow == null) {
/* 057 */ if (!leftIter.hasNext()) return false;
/* 058 */ smj_leftRow = (InternalRow) leftIter.next();
/* 059 */
/* 060 */ boolean smj_isNull = smj_leftRow.isNullAt(0);
/* 061 */ UTF8String smj_value = smj_isNull ? null :
(smj_leftRow.getUTF8String(0));
/* 062 */
/* 063 */ boolean smj_isNull1 = smj_leftRow.isNullAt(1);
/* 064 */ UTF8String smj_value1 = smj_isNull1 ? null :
(smj_leftRow.getUTF8String(1));
/* 065 */ if (smj_isNull || smj_isNull1) {
/* 066 */ smj_leftRow = null;
/* 067 */ continue;
/* 068 */ }
/* 069 */ if (!smj_matches.isEmpty()) {
/* 070 */ comp = 0;
/* 071 */ if (comp == 0) {
/* 072 */ comp = smj_value.compare(smj_value6);
/* 073 */ }
/* 074 */ if (comp == 0) {
/* 075 */ comp = smj_value1.compare(smj_value7);
/* 076 */ }
/* 077 */
/* 078 */ if (comp == 0) {
/* 079 */ return true;
/* 080 */ }
/* 081 */ smj_matches.clear();
/* 082 */ }
/* 083 */
/* 084 */ do {
/* 085 */ if (smj_rightRow == null) {
/* 086 */ if (!rightIter.hasNext()) {
/* 087 */ smj_value6 = smj_value;
/* 088 */
/* 089 */ smj_value7 = smj_value1;
/* 090 */
/* 091 */ return !smj_matches.isEmpty();
/* 092 */ }
/* 093 */ smj_rightRow = (InternalRow) rightIter.next();
/* 094 */
/* 095 */ boolean smj_isNull2 = smj_rightRow.isNullAt(0);
/* 096 */ UTF8String smj_value2 = smj_isNull2 ? null :
(smj_rightRow.getUTF8String(0));
/* 097 */
/* 098 */ boolean smj_isNull3 = smj_rightRow.isNullAt(1);
/* 099 */ UTF8String smj_value3 = smj_isNull3 ? null :
(smj_rightRow.getUTF8String(1));
/* 100 */ if (smj_isNull2 || smj_isNull3) {
/* 101 */ smj_rightRow = null;
/* 102 */ continue;
/* 103 */ }
/* 104 */
/* 105 */ smj_value4 = smj_value2;
/* 106 */
/* 107 */ smj_value5 = smj_value3;
/* 108 */
/* 109 */ }
/* 110 */
/* 111 */ comp = 0;
/* 112 */ if (comp == 0) {
/* 113 */ comp = smj_value.compare(smj_value4);
/* 114 */ }
/* 115 */ if (comp == 0) {
/* 116 */ comp = smj_value1.compare(smj_value5);
/* 117 */ }
/* 118 */
/* 119 */ if (comp > 0) {
/* 120 */ smj_rightRow = null;
/* 121 */ } else if (comp < 0) {
/* 122 */ if (!smj_matches.isEmpty()) {
/* 123 */ smj_value6 = smj_value;
/* 124 */
/* 125 */ smj_value7 = smj_value1;
/* 126 */
/* 127 */ return true;
/* 128 */ }
/* 129 */ smj_leftRow = null;
/* 130 */ } else {
/* 131 */ smj_matches.add(smj_rightRow.copy());
/* 132 */ smj_rightRow = null;;
/* 133 */ }
/* 134 */ } while (smj_leftRow != null);
/* 135 */ }
/* 136 */ return false; // unreachable
/* 137 */ }
/* 138 */
/* 139 */ protected void processNext() throws java.io.IOException {
/* 140 */ while (findNextInnerJoinRows(smj_leftInput, smj_rightInput)) {
/* 141 */ int smj_size = smj_matches.size();
/* 142 */ smj_isNull4 = smj_leftRow.isNullAt(0);
/* 143 */ smj_value8 = smj_isNull4 ? null :
(smj_leftRow.getUTF8String(0));
/* 144 */ smj_isNull5 = smj_leftRow.isNullAt(1);
/* 145 */ smj_value9 = smj_isNull5 ? null :
(smj_leftRow.getUTF8String(1));
/* 146 */ smj_value10 = smj_leftRow.getLong(2);
/* 147 */ for (int smj_i = 0; smj_i < smj_size; smj_i ++) {
/* 148 */ InternalRow smj_rightRow1 = (InternalRow)
smj_matches.get(smj_i);
/* 149 */
/* 150 */ smj_numOutputRows.add(1);
/* 151 */
/* 152 */ long smj_value13 = smj_rightRow1.getLong(2);
/* 153 */ boolean project_isNull8 = false;
/* 154 */ double project_value8 = -1.0;
/* 155 */ if (!false) {
/* 156 */ project_value8 = (double) smj_value13;
/* 157 */ }
/* 158 */ boolean project_isNull3 = false;
/* 159 */ double project_value3 = -1.0;
/* 160 */ if (project_value8 == 0) {
/* 161 */ project_isNull3 = true;
/* 162 */ } else {
/* 163 */ long project_value5 = -1L;
/* 164 */ project_value5 = smj_value10 * 100L;
/* 165 */ boolean project_isNull4 = false;
/* 166 */ double project_value4 = -1.0;
/* 167 */ if (!false) {
/* 168 */ project_value4 = (double) project_value5;
/* 169 */ }
/* 170 */ project_value3 = (double)(project_value4 / project_value8);
/* 171 */ }
/* 172 */ boolean project_isNull2 = project_isNull3;
/* 173 */ double project_value2 = -1.0;
/* 174 */ if (!project_isNull2) {
/* 175 */ if (Double.isNaN(project_value3) ||
Double.isInfinite(project_value3)) {
/* 176 */ project_value2 = project_value3;
/* 177 */ } else {
/* 178 */ project_value2 =
java.math.BigDecimal.valueOf(project_value3).
/* 179 */ setScale(2,
java.math.BigDecimal.ROUND_HALF_UP).doubleValue();
/* 180 */ }
/* 181 */ }
/* 182 */ project_holder.reset();
/* 183 */
/* 184 */ project_rowWriter.zeroOutNullBytes();
/* 185 */
/* 186 */ if (smj_isNull4) {
/* 187 */ project_rowWriter.setNullAt(0);
/* 188 */ } else {
/* 189 */ project_rowWriter.write(0, smj_value8);
/* 190 */ }
/* 191 */
/* 192 */ if (smj_isNull5) {
/* 193 */ project_rowWriter.setNullAt(1);
/* 194 */ } else {
/* 195 */ project_rowWriter.write(1, smj_value9);
/* 196 */ }
/* 197 */
/* 198 */ if (project_isNull2) {
/* 199 */ project_rowWriter.setNullAt(2);
/* 200 */ } else {
/* 201 */ project_rowWriter.write(2, project_value2);
/* 202 */ }
/* 203 */ project_result.setTotalSize(project_holder.totalSize());
/* 204 */ append(project_result.copy());
/* 205 */
/* 206 */ }
/* 207 */ if (shouldStop()) return;
/* 208 */ }
/* 209 */ }
/* 210 */ }
== Subtree 3 / 5 ==
*HashAggregate(key=[Origin#155,UniqueCarrier#147],
functions=[partial_count(1)], output=[Origin#155,UniqueCarrier#147,count#303L])
+- *Project [UniqueCarrier#147, Origin#155]
+- *Filter (((isnotnull(UniqueCarrier#147) && isnotnull(Origin#155)) &&
isnotnull(Dest#156)) && (Dest#156 = ORD))
+- *Scan csv [UniqueCarrier#147,Origin#155,Dest#156] Format: CSV,
InputPaths: file:/home/robbins/brandberry/2008.csv, PushedFilters:
[IsNotNull(UniqueCarrier), IsNotNull(Origin), IsNotNull(Dest),
EqualTo(Dest,ORD)], ReadSchema:
struct<UniqueCarrier:string,Origin:string,Dest:string>
Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIterator extends
org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */ private Object[] references;
/* 007 */ private boolean agg_initAgg;
/* 008 */ private boolean agg_bufIsNull;
/* 009 */ private long agg_bufValue;
/* 010 */ private agg_VectorizedHashMap agg_vectorizedHashMap;
/* 011 */ private
java.util.Iterator<org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row>
agg_vectorizedHashMapIter;
/* 012 */ private org.apache.spark.sql.execution.aggregate.HashAggregateExec
agg_plan;
/* 013 */ private
org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap agg_hashMap;
/* 014 */ private org.apache.spark.sql.execution.UnsafeKVExternalSorter
agg_sorter;
/* 015 */ private org.apache.spark.unsafe.KVIterator agg_mapIter;
/* 016 */ private org.apache.spark.sql.execution.metric.SQLMetric
agg_peakMemory;
/* 017 */ private org.apache.spark.sql.execution.metric.SQLMetric
agg_spillSize;
/* 018 */ private org.apache.spark.sql.execution.metric.SQLMetric
scan_numOutputRows;
/* 019 */ private scala.collection.Iterator scan_input;
/* 020 */ private org.apache.spark.sql.execution.metric.SQLMetric
filter_numOutputRows;
/* 021 */ private UnsafeRow filter_result;
/* 022 */ private
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder filter_holder;
/* 023 */ private
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
filter_rowWriter;
/* 024 */ private UnsafeRow project_result;
/* 025 */ private
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder project_holder;
/* 026 */ private
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
project_rowWriter;
/* 027 */ private UnsafeRow agg_result2;
/* 028 */ private
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder;
/* 029 */ private
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter;
/* 030 */ private
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowJoiner
agg_unsafeRowJoiner;
/* 031 */ private org.apache.spark.sql.execution.metric.SQLMetric
wholestagecodegen_numOutputRows;
/* 032 */ private org.apache.spark.sql.execution.metric.SQLMetric
wholestagecodegen_aggTime;
/* 033 */ private UnsafeRow wholestagecodegen_result;
/* 034 */ private
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder
wholestagecodegen_holder;
/* 035 */ private
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
wholestagecodegen_rowWriter;
/* 036 */
/* 037 */ public GeneratedIterator(Object[] references) {
/* 038 */ this.references = references;
/* 039 */ }
/* 040 */
/* 041 */ public void init(int index, scala.collection.Iterator inputs[]) {
/* 042 */ partitionIndex = index;
/* 043 */ agg_initAgg = false;
/* 044 */
/* 045 */ agg_vectorizedHashMap = new agg_VectorizedHashMap();
/* 046 */
/* 047 */ this.agg_plan =
(org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[0];
/* 048 */
/* 049 */ this.agg_peakMemory =
(org.apache.spark.sql.execution.metric.SQLMetric) references[1];
/* 050 */ this.agg_spillSize =
(org.apache.spark.sql.execution.metric.SQLMetric) references[2];
/* 051 */ this.scan_numOutputRows =
(org.apache.spark.sql.execution.metric.SQLMetric) references[3];
/* 052 */ scan_input = inputs[0];
/* 053 */ this.filter_numOutputRows =
(org.apache.spark.sql.execution.metric.SQLMetric) references[4];
/* 054 */ filter_result = new UnsafeRow(3);
/* 055 */ this.filter_holder = new
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(filter_result,
96);
/* 056 */ this.filter_rowWriter = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(filter_holder,
3);
/* 057 */ project_result = new UnsafeRow(2);
/* 058 */ this.project_holder = new
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(project_result,
64);
/* 059 */ this.project_rowWriter = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(project_holder,
2);
/* 060 */ agg_result2 = new UnsafeRow(2);
/* 061 */ this.agg_holder = new
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result2, 64);
/* 062 */ this.agg_rowWriter = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder,
2);
/* 063 */ agg_unsafeRowJoiner = agg_plan.createUnsafeJoiner();
/* 064 */ this.wholestagecodegen_numOutputRows =
(org.apache.spark.sql.execution.metric.SQLMetric) references[6];
/* 065 */ this.wholestagecodegen_aggTime =
(org.apache.spark.sql.execution.metric.SQLMetric) references[7];
/* 066 */ wholestagecodegen_result = new UnsafeRow(3);
/* 067 */ this.wholestagecodegen_holder = new
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(wholestagecodegen_result,
64);
/* 068 */ this.wholestagecodegen_rowWriter = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(wholestagecodegen_holder,
3);
/* 069 */ }
/* 070 */
/* 071 */ public class agg_VectorizedHashMap {
/* 072 */ private org.apache.spark.sql.execution.vectorized.ColumnarBatch
batch;
/* 073 */ private org.apache.spark.sql.execution.vectorized.ColumnarBatch
aggregateBufferBatch;
/* 074 */ private int[] buckets;
/* 075 */ private int capacity = 1 << 16;
/* 076 */ private double loadFactor = 0.5;
/* 077 */ private int numBuckets = (int) (capacity / loadFactor);
/* 078 */ private int maxSteps = 2;
/* 079 */ private int numRows = 0;
/* 080 */ private org.apache.spark.sql.types.StructType schema = new
org.apache.spark.sql.types.StructType().add("Origin",
org.apache.spark.sql.types.DataTypes.StringType)
/* 081 */ .add("UniqueCarrier",
org.apache.spark.sql.types.DataTypes.StringType)
/* 082 */ .add("count", org.apache.spark.sql.types.DataTypes.LongType);
/* 083 */ private org.apache.spark.sql.types.StructType
aggregateBufferSchema =
/* 084 */ new org.apache.spark.sql.types.StructType().add("count",
org.apache.spark.sql.types.DataTypes.LongType);
/* 085 */
/* 086 */ public agg_VectorizedHashMap() {
/* 087 */ batch =
org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(schema,
/* 088 */ org.apache.spark.memory.MemoryMode.ON_HEAP, capacity);
/* 089 */ // TODO: Possibly generate this projection in HashAggregate
directly
/* 090 */ aggregateBufferBatch =
org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(
/* 091 */ aggregateBufferSchema,
org.apache.spark.memory.MemoryMode.ON_HEAP, capacity);
/* 092 */ for (int i = 0 ; i < aggregateBufferBatch.numCols(); i++) {
/* 093 */ aggregateBufferBatch.setColumn(i, batch.column(i+2));
/* 094 */ }
/* 095 */
/* 096 */ buckets = new int[numBuckets];
/* 097 */ java.util.Arrays.fill(buckets, -1);
/* 098 */ }
/* 099 */
/* 100 */ public
org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row
findOrInsert(UTF8String agg_key, UTF8String agg_key1) {
/* 101 */ long h = hash(agg_key, agg_key1);
/* 102 */ int step = 0;
/* 103 */ int idx = (int) h & (numBuckets - 1);
/* 104 */ while (step < maxSteps) {
/* 105 */ // Return bucket index if it's either an empty slot or
already contains the key
/* 106 */ if (buckets[idx] == -1) {
/* 107 */ if (numRows < capacity) {
/* 108 */ // Initialize aggregate keys
/* 109 */ batch.column(0).putByteArray(numRows, agg_key.getBytes());
/* 110 */ batch.column(1).putByteArray(numRows,
agg_key1.getBytes());
/* 111 */
/* 112 */ agg_bufIsNull = false;
/* 113 */ agg_bufValue = 0L;
/* 114 */
/* 115 */ // Initialize aggregate values
/* 116 */
/* 117 */ if (!agg_bufIsNull) {
/* 118 */ batch.column(2).putLong(numRows, agg_bufValue);
/* 119 */ } else {
/* 120 */ batch.column(2).putNull(numRows);
/* 121 */ }
/* 122 */
/* 123 */ buckets[idx] = numRows++;
/* 124 */ batch.setNumRows(numRows);
/* 125 */ aggregateBufferBatch.setNumRows(numRows);
/* 126 */ return aggregateBufferBatch.getRow(buckets[idx]);
/* 127 */ } else {
/* 128 */ // No more space
/* 129 */ return null;
/* 130 */ }
/* 131 */ } else if (equals(idx, agg_key, agg_key1)) {
/* 132 */ return aggregateBufferBatch.getRow(buckets[idx]);
/* 133 */ }
/* 134 */ idx = (idx + 1) & (numBuckets - 1);
/* 135 */ step++;
/* 136 */ }
/* 137 */ // Didn't find it
/* 138 */ return null;
/* 139 */ }
/* 140 */
/* 141 */ private boolean equals(int idx, UTF8String agg_key, UTF8String
agg_key1) {
/* 142 */ return
(batch.column(0).getUTF8String(buckets[idx]).equals(agg_key)) &&
(batch.column(1).getUTF8String(buckets[idx]).equals(agg_key1));
/* 143 */ }
/* 144 */
/* 145 */ private long hash(UTF8String agg_key, UTF8String agg_key1) {
/* 146 */ long agg_hash = 0;
/* 147 */
/* 148 */ int agg_result = 0;
/* 149 */ for (int i = 0; i < agg_key.getBytes().length; i++) {
/* 150 */ int agg_hash1 = agg_key.getBytes()[i];
/* 151 */ agg_result = (agg_result ^ (0x9e3779b9)) + agg_hash1 +
(agg_result << 6) + (agg_result >>> 2);
/* 152 */ }
/* 153 */
/* 154 */ agg_hash = (agg_hash ^ (0x9e3779b9)) + agg_result + (agg_hash
<< 6) + (agg_hash >>> 2);
/* 155 */
/* 156 */ int agg_result1 = 0;
/* 157 */ for (int i = 0; i < agg_key1.getBytes().length; i++) {
/* 158 */ int agg_hash2 = agg_key1.getBytes()[i];
/* 159 */ agg_result1 = (agg_result1 ^ (0x9e3779b9)) + agg_hash2 +
(agg_result1 << 6) + (agg_result1 >>> 2);
/* 160 */ }
/* 161 */
/* 162 */ agg_hash = (agg_hash ^ (0x9e3779b9)) + agg_result1 + (agg_hash
<< 6) + (agg_hash >>> 2);
/* 163 */
/* 164 */ return agg_hash;
/* 165 */ }
/* 166 */
/* 167 */ public
java.util.Iterator<org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row>
/* 168 */ rowIterator() {
/* 169 */ return batch.rowIterator();
/* 170 */ }
/* 171 */
/* 172 */ public void close() {
/* 173 */ batch.close();
/* 174 */ }
/* 175 */
/* 176 */ }
/* 177 */
/* 178 */ private void agg_doAggregateWithKeys() throws java.io.IOException {
/* 179 */ agg_hashMap = agg_plan.createHashMap();
/* 180 */
/* 181 */ while (scan_input.hasNext()) {
/* 182 */ InternalRow scan_row = (InternalRow) scan_input.next();
/* 183 */ scan_numOutputRows.add(1);
/* 184 */ boolean scan_isNull5 = scan_row.isNullAt(2);
/* 185 */ UTF8String scan_value5 = scan_isNull5 ? null :
(scan_row.getUTF8String(2));
/* 186 */
/* 187 */ if (!(!(scan_isNull5))) continue;
/* 188 */
/* 189 */ Object filter_obj = ((Expression) references[5]).eval(null);
/* 190 */ UTF8String filter_value4 = (UTF8String) filter_obj;
/* 191 */ boolean filter_value2 = false;
/* 192 */ filter_value2 = scan_value5.equals(filter_value4);
/* 193 */ if (!filter_value2) continue;
/* 194 */
/* 195 */ boolean scan_isNull3 = scan_row.isNullAt(0);
/* 196 */ UTF8String scan_value3 = scan_isNull3 ? null :
(scan_row.getUTF8String(0));
/* 197 */
/* 198 */ if (!(!(scan_isNull3))) continue;
/* 199 */
/* 200 */ boolean scan_isNull4 = scan_row.isNullAt(1);
/* 201 */ UTF8String scan_value4 = scan_isNull4 ? null :
(scan_row.getUTF8String(1));
/* 202 */
/* 203 */ if (!(!(scan_isNull4))) continue;
/* 204 */
/* 205 */ filter_numOutputRows.add(1);
/* 206 */
/* 207 */ UnsafeRow agg_unsafeRowAggBuffer = null;
/* 208 */ org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row
agg_vectorizedAggBuffer = null;
/* 209 */
/* 210 */ if (true) {
/* 211 */ if (!false && !false) {
/* 212 */ agg_vectorizedAggBuffer =
agg_vectorizedHashMap.findOrInsert(
/* 213 */ scan_value4, scan_value3);
/* 214 */ }
/* 215 */ }
/* 216 */
/* 217 */ if (agg_vectorizedAggBuffer == null) {
/* 218 */ // generate grouping key
/* 219 */ agg_holder.reset();
/* 220 */
/* 221 */ agg_rowWriter.write(0, scan_value4);
/* 222 */
/* 223 */ agg_rowWriter.write(1, scan_value3);
/* 224 */ agg_result2.setTotalSize(agg_holder.totalSize());
/* 225 */ int agg_value6 = 42;
/* 226 */
/* 227 */ if (!false) {
/* 228 */ agg_value6 =
org.apache.spark.unsafe.hash.Murmur3_x86_32.hashUnsafeBytes(scan_value4.getBaseObject(),
scan_value4.getBaseOffset(), scan_value4.numBytes(), agg_value6);
/* 229 */ }
/* 230 */
/* 231 */ if (!false) {
/* 232 */ agg_value6 =
org.apache.spark.unsafe.hash.Murmur3_x86_32.hashUnsafeBytes(scan_value3.getBaseObject(),
scan_value3.getBaseOffset(), scan_value3.numBytes(), agg_value6);
/* 233 */ }
/* 234 */ if (true) {
/* 235 */ // try to get the buffer from hash map
/* 236 */ agg_unsafeRowAggBuffer =
/* 237 */ agg_hashMap.getAggregationBufferFromUnsafeRow(agg_result2,
agg_value6);
/* 238 */ }
/* 239 */ if (agg_unsafeRowAggBuffer == null) {
/* 240 */ if (agg_sorter == null) {
/* 241 */ agg_sorter =
agg_hashMap.destructAndCreateExternalSorter();
/* 242 */ } else {
/* 243 */
agg_sorter.merge(agg_hashMap.destructAndCreateExternalSorter());
/* 244 */ }
/* 245 */
/* 246 */ // the hash map had be spilled, it should have enough
memory now,
/* 247 */ // try to allocate buffer again.
/* 248 */ agg_unsafeRowAggBuffer =
/* 249 */ agg_hashMap.getAggregationBufferFromUnsafeRow(agg_result2,
agg_value6);
/* 250 */ if (agg_unsafeRowAggBuffer == null) {
/* 251 */ // failed to allocate the first page
/* 252 */ throw new OutOfMemoryError("No enough memory for
aggregation");
/* 253 */ }
/* 254 */ }
/* 255 */ }
/* 256 */
/* 257 */ if (agg_vectorizedAggBuffer != null) {
/* 258 */ // update vectorized row
/* 259 */
/* 260 */ // common sub-expressions
/* 261 */
/* 262 */ // evaluate aggregate function
/* 263 */ long agg_value10 = agg_vectorizedAggBuffer.getLong(0);
/* 264 */
/* 265 */ long agg_value9 = -1L;
/* 266 */ agg_value9 = agg_value10 + 1L;
/* 267 */ // update vectorized row
/* 268 */ agg_vectorizedAggBuffer.setLong(0, agg_value9);
/* 269 */
/* 270 */ } else {
/* 271 */ // update unsafe row
/* 272 */
/* 273 */ // common sub-expressions
/* 274 */
/* 275 */ // evaluate aggregate function
/* 276 */ long agg_value13 = agg_unsafeRowAggBuffer.getLong(0);
/* 277 */
/* 278 */ long agg_value12 = -1L;
/* 279 */ agg_value12 = agg_value13 + 1L;
/* 280 */ // update unsafe row buffer
/* 281 */ agg_unsafeRowAggBuffer.setLong(0, agg_value12);
/* 282 */
/* 283 */ }
/* 284 */ if (shouldStop()) return;
/* 285 */ }
/* 286 */
/* 287 */ agg_vectorizedHashMapIter = agg_vectorizedHashMap.rowIterator();
/* 288 */
/* 289 */ agg_mapIter = agg_plan.finishAggregate(agg_hashMap, agg_sorter,
agg_peakMemory, agg_spillSize);
/* 290 */ }
/* 291 */
/* 292 */ protected void processNext() throws java.io.IOException {
/* 293 */ if (!agg_initAgg) {
/* 294 */ agg_initAgg = true;
/* 295 */ long wholestagecodegen_beforeAgg = System.nanoTime();
/* 296 */ agg_doAggregateWithKeys();
/* 297 */ wholestagecodegen_aggTime.add((System.nanoTime() -
wholestagecodegen_beforeAgg) / 1000000);
/* 298 */ }
/* 299 */
/* 300 */ // output the result
/* 301 */
/* 302 */ while (agg_vectorizedHashMapIter.hasNext()) {
/* 303 */ wholestagecodegen_numOutputRows.add(1);
/* 304 */ org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row
wholestagecodegen_vectorizedHashMapRow =
/* 305 */ (org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row)
/* 306 */ agg_vectorizedHashMapIter.next();
/* 307 */
/* 308 */ wholestagecodegen_holder.reset();
/* 309 */
/* 310 */ wholestagecodegen_rowWriter.zeroOutNullBytes();
/* 311 */
/* 312 */ boolean wholestagecodegen_isNull =
wholestagecodegen_vectorizedHashMapRow.isNullAt(0);
/* 313 */ UTF8String wholestagecodegen_value = wholestagecodegen_isNull ?
null : (wholestagecodegen_vectorizedHashMapRow.getUTF8String(0));
/* 314 */ if (wholestagecodegen_isNull) {
/* 315 */ wholestagecodegen_rowWriter.setNullAt(0);
/* 316 */ } else {
/* 317 */ wholestagecodegen_rowWriter.write(0, wholestagecodegen_value);
/* 318 */ }
/* 319 */
/* 320 */ boolean wholestagecodegen_isNull1 =
wholestagecodegen_vectorizedHashMapRow.isNullAt(1);
/* 321 */ UTF8String wholestagecodegen_value1 = wholestagecodegen_isNull1
? null : (wholestagecodegen_vectorizedHashMapRow.getUTF8String(1));
/* 322 */ if (wholestagecodegen_isNull1) {
/* 323 */ wholestagecodegen_rowWriter.setNullAt(1);
/* 324 */ } else {
/* 325 */ wholestagecodegen_rowWriter.write(1,
wholestagecodegen_value1);
/* 326 */ }
/* 327 */
/* 328 */ long wholestagecodegen_value2 =
wholestagecodegen_vectorizedHashMapRow.getLong(2);
/* 329 */ wholestagecodegen_rowWriter.write(2, wholestagecodegen_value2);
/* 330 */
wholestagecodegen_result.setTotalSize(wholestagecodegen_holder.totalSize());
/* 331 */
/* 332 */ append(wholestagecodegen_result);
/* 333 */
/* 334 */ if (shouldStop()) return;
/* 335 */ }
/* 336 */
/* 337 */ agg_vectorizedHashMap.close();
/* 338 */
/* 339 */ while (agg_mapIter.next()) {
/* 340 */ wholestagecodegen_numOutputRows.add(1);
/* 341 */ UnsafeRow agg_aggKey = (UnsafeRow) agg_mapIter.getKey();
/* 342 */ UnsafeRow agg_aggBuffer = (UnsafeRow) agg_mapIter.getValue();
/* 343 */
/* 344 */ UnsafeRow agg_resultRow = agg_unsafeRowJoiner.join(agg_aggKey,
agg_aggBuffer);
/* 345 */
/* 346 */ append(agg_resultRow);
/* 347 */
/* 348 */ if (shouldStop()) return;
/* 349 */ }
/* 350 */
/* 351 */ agg_mapIter.close();
/* 352 */ if (agg_sorter == null) {
/* 353 */ agg_hashMap.free();
/* 354 */ }
/* 355 */ }
/* 356 */ }
== Subtree 4 / 5 ==
*Sort [Origin#155 ASC, UniqueCarrier#147 ASC], false, 0
+- *HashAggregate(key=[Origin#155,UniqueCarrier#147], functions=[count(1)],
output=[Origin#155,UniqueCarrier#147,total#97L])
+- Exchange hashpartitioning(Origin#155, UniqueCarrier#147, 200)
+- *HashAggregate(key=[Origin#155,UniqueCarrier#147],
functions=[partial_count(1)], output=[Origin#155,UniqueCarrier#147,count#303L])
+- *Project [UniqueCarrier#147, Origin#155]
+- *Filter (((isnotnull(UniqueCarrier#147) &&
isnotnull(Origin#155)) && isnotnull(Dest#156)) && (Dest#156 = ORD))
+- *Scan csv [UniqueCarrier#147,Origin#155,Dest#156] Format:
CSV, InputPaths: file:/home/robbins/brandberry/2008.csv, PushedFilters:
[IsNotNull(UniqueCarrier), IsNotNull(Origin), IsNotNull(Dest),
EqualTo(Dest,ORD)], ReadSchema:
struct<UniqueCarrier:string,Origin:string,Dest:string>
Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIterator extends
org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */ private Object[] references;
/* 007 */ private boolean sort_needToSort;
/* 008 */ private org.apache.spark.sql.execution.SortExec sort_plan;
/* 009 */ private org.apache.spark.sql.execution.UnsafeExternalRowSorter
sort_sorter;
/* 010 */ private org.apache.spark.executor.TaskMetrics sort_metrics;
/* 011 */ private scala.collection.Iterator<UnsafeRow> sort_sortedIter;
/* 012 */ private boolean agg_initAgg;
/* 013 */ private boolean agg_bufIsNull;
/* 014 */ private long agg_bufValue;
/* 015 */ private org.apache.spark.sql.execution.aggregate.HashAggregateExec
agg_plan;
/* 016 */ private
org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap agg_hashMap;
/* 017 */ private org.apache.spark.sql.execution.UnsafeKVExternalSorter
agg_sorter;
/* 018 */ private org.apache.spark.unsafe.KVIterator agg_mapIter;
/* 019 */ private org.apache.spark.sql.execution.metric.SQLMetric
agg_peakMemory;
/* 020 */ private org.apache.spark.sql.execution.metric.SQLMetric
agg_spillSize;
/* 021 */ private scala.collection.Iterator inputadapter_input;
/* 022 */ private UnsafeRow agg_result;
/* 023 */ private
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder;
/* 024 */ private
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter;
/* 025 */ private UnsafeRow agg_result1;
/* 026 */ private
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder1;
/* 027 */ private
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
agg_rowWriter1;
/* 028 */ private org.apache.spark.sql.execution.metric.SQLMetric
sort_numOutputRows;
/* 029 */ private org.apache.spark.sql.execution.metric.SQLMetric
sort_aggTime;
/* 030 */ private org.apache.spark.sql.execution.metric.SQLMetric
sort_peakMemory;
/* 031 */ private org.apache.spark.sql.execution.metric.SQLMetric
sort_spillSize;
/* 032 */ private org.apache.spark.sql.execution.metric.SQLMetric
sort_sortTime;
/* 033 */
/* 034 */ public GeneratedIterator(Object[] references) {
/* 035 */ this.references = references;
/* 036 */ }
/* 037 */
/* 038 */ public void init(int index, scala.collection.Iterator inputs[]) {
/* 039 */ partitionIndex = index;
/* 040 */ sort_needToSort = true;
/* 041 */ this.sort_plan = (org.apache.spark.sql.execution.SortExec)
references[0];
/* 042 */ sort_sorter = sort_plan.createSorter();
/* 043 */ sort_metrics = org.apache.spark.TaskContext.get().taskMetrics();
/* 044 */
/* 045 */ agg_initAgg = false;
/* 046 */
/* 047 */ this.agg_plan =
(org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[1];
/* 048 */
/* 049 */ this.agg_peakMemory =
(org.apache.spark.sql.execution.metric.SQLMetric) references[2];
/* 050 */ this.agg_spillSize =
(org.apache.spark.sql.execution.metric.SQLMetric) references[3];
/* 051 */ inputadapter_input = inputs[0];
/* 052 */ agg_result = new UnsafeRow(2);
/* 053 */ this.agg_holder = new
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result, 64);
/* 054 */ this.agg_rowWriter = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder,
2);
/* 055 */ agg_result1 = new UnsafeRow(3);
/* 056 */ this.agg_holder1 = new
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result1, 64);
/* 057 */ this.agg_rowWriter1 = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder1,
3);
/* 058 */ this.sort_numOutputRows =
(org.apache.spark.sql.execution.metric.SQLMetric) references[4];
/* 059 */ this.sort_aggTime =
(org.apache.spark.sql.execution.metric.SQLMetric) references[5];
/* 060 */ this.sort_peakMemory =
(org.apache.spark.sql.execution.metric.SQLMetric) references[6];
/* 061 */ this.sort_spillSize =
(org.apache.spark.sql.execution.metric.SQLMetric) references[7];
/* 062 */ this.sort_sortTime =
(org.apache.spark.sql.execution.metric.SQLMetric) references[8];
/* 063 */ }
/* 064 */
/* 065 */ private void agg_doAggregateWithKeys() throws java.io.IOException {
/* 066 */ agg_hashMap = agg_plan.createHashMap();
/* 067 */
/* 068 */ while (inputadapter_input.hasNext()) {
/* 069 */ InternalRow inputadapter_row = (InternalRow)
inputadapter_input.next();
/* 070 */ boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 071 */ UTF8String inputadapter_value = inputadapter_isNull ? null :
(inputadapter_row.getUTF8String(0));
/* 072 */ boolean inputadapter_isNull1 = inputadapter_row.isNullAt(1);
/* 073 */ UTF8String inputadapter_value1 = inputadapter_isNull1 ? null :
(inputadapter_row.getUTF8String(1));
/* 074 */ long inputadapter_value2 = inputadapter_row.getLong(2);
/* 075 */
/* 076 */ UnsafeRow agg_unsafeRowAggBuffer = null;
/* 077 */ org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row
agg_vectorizedAggBuffer = null;
/* 078 */
/* 079 */ if (agg_vectorizedAggBuffer == null) {
/* 080 */ // generate grouping key
/* 081 */ agg_holder.reset();
/* 082 */
/* 083 */ agg_rowWriter.zeroOutNullBytes();
/* 084 */
/* 085 */ if (inputadapter_isNull) {
/* 086 */ agg_rowWriter.setNullAt(0);
/* 087 */ } else {
/* 088 */ agg_rowWriter.write(0, inputadapter_value);
/* 089 */ }
/* 090 */
/* 091 */ if (inputadapter_isNull1) {
/* 092 */ agg_rowWriter.setNullAt(1);
/* 093 */ } else {
/* 094 */ agg_rowWriter.write(1, inputadapter_value1);
/* 095 */ }
/* 096 */ agg_result.setTotalSize(agg_holder.totalSize());
/* 097 */ int agg_value6 = 42;
/* 098 */
/* 099 */ if (!inputadapter_isNull) {
/* 100 */ agg_value6 =
org.apache.spark.unsafe.hash.Murmur3_x86_32.hashUnsafeBytes(inputadapter_value.getBaseObject(),
inputadapter_value.getBaseOffset(), inputadapter_value.numBytes(), agg_value6);
/* 101 */ }
/* 102 */
/* 103 */ if (!inputadapter_isNull1) {
/* 104 */ agg_value6 =
org.apache.spark.unsafe.hash.Murmur3_x86_32.hashUnsafeBytes(inputadapter_value1.getBaseObject(),
inputadapter_value1.getBaseOffset(), inputadapter_value1.numBytes(),
agg_value6);
/* 105 */ }
/* 106 */ if (true) {
/* 107 */ // try to get the buffer from hash map
/* 108 */ agg_unsafeRowAggBuffer =
/* 109 */ agg_hashMap.getAggregationBufferFromUnsafeRow(agg_result,
agg_value6);
/* 110 */ }
/* 111 */ if (agg_unsafeRowAggBuffer == null) {
/* 112 */ if (agg_sorter == null) {
/* 113 */ agg_sorter =
agg_hashMap.destructAndCreateExternalSorter();
/* 114 */ } else {
/* 115 */
agg_sorter.merge(agg_hashMap.destructAndCreateExternalSorter());
/* 116 */ }
/* 117 */
/* 118 */ // the hash map had be spilled, it should have enough
memory now,
/* 119 */ // try to allocate buffer again.
/* 120 */ agg_unsafeRowAggBuffer =
/* 121 */ agg_hashMap.getAggregationBufferFromUnsafeRow(agg_result,
agg_value6);
/* 122 */ if (agg_unsafeRowAggBuffer == null) {
/* 123 */ // failed to allocate the first page
/* 124 */ throw new OutOfMemoryError("No enough memory for
aggregation");
/* 125 */ }
/* 126 */ }
/* 127 */ }
/* 128 */
/* 129 */ if (agg_vectorizedAggBuffer != null) {
/* 130 */ // update vectorized row
/* 131 */
/* 132 */ } else {
/* 133 */ // update unsafe row
/* 134 */
/* 135 */ // common sub-expressions
/* 136 */
/* 137 */ // evaluate aggregate function
/* 138 */ long agg_value10 = agg_unsafeRowAggBuffer.getLong(0);
/* 139 */
/* 140 */ long agg_value9 = -1L;
/* 141 */ agg_value9 = agg_value10 + inputadapter_value2;
/* 142 */ // update unsafe row buffer
/* 143 */ agg_unsafeRowAggBuffer.setLong(0, agg_value9);
/* 144 */
/* 145 */ }
/* 146 */ if (shouldStop()) return;
/* 147 */ }
/* 148 */
/* 149 */ agg_mapIter = agg_plan.finishAggregate(agg_hashMap, agg_sorter,
agg_peakMemory, agg_spillSize);
/* 150 */ }
/* 151 */
/* 152 */ private void sort_addToSorter() throws java.io.IOException {
/* 153 */ if (!agg_initAgg) {
/* 154 */ agg_initAgg = true;
/* 155 */ long sort_beforeAgg = System.nanoTime();
/* 156 */ agg_doAggregateWithKeys();
/* 157 */ sort_aggTime.add((System.nanoTime() - sort_beforeAgg) /
1000000);
/* 158 */ }
/* 159 */
/* 160 */ // output the result
/* 161 */
/* 162 */ while (agg_mapIter.next()) {
/* 163 */ sort_numOutputRows.add(1);
/* 164 */ UnsafeRow agg_aggKey = (UnsafeRow) agg_mapIter.getKey();
/* 165 */ UnsafeRow agg_aggBuffer = (UnsafeRow) agg_mapIter.getValue();
/* 166 */
/* 167 */ boolean agg_isNull11 = agg_aggKey.isNullAt(0);
/* 168 */ UTF8String agg_value12 = agg_isNull11 ? null :
(agg_aggKey.getUTF8String(0));
/* 169 */ boolean agg_isNull12 = agg_aggKey.isNullAt(1);
/* 170 */ UTF8String agg_value13 = agg_isNull12 ? null :
(agg_aggKey.getUTF8String(1));
/* 171 */ long agg_value14 = agg_aggBuffer.getLong(0);
/* 172 */
/* 173 */ agg_holder1.reset();
/* 174 */
/* 175 */ agg_rowWriter1.zeroOutNullBytes();
/* 176 */
/* 177 */ if (agg_isNull11) {
/* 178 */ agg_rowWriter1.setNullAt(0);
/* 179 */ } else {
/* 180 */ agg_rowWriter1.write(0, agg_value12);
/* 181 */ }
/* 182 */
/* 183 */ if (agg_isNull12) {
/* 184 */ agg_rowWriter1.setNullAt(1);
/* 185 */ } else {
/* 186 */ agg_rowWriter1.write(1, agg_value13);
/* 187 */ }
/* 188 */
/* 189 */ agg_rowWriter1.write(2, agg_value14);
/* 190 */ agg_result1.setTotalSize(agg_holder1.totalSize());
/* 191 */ sort_sorter.insertRow((UnsafeRow)agg_result1);
/* 192 */
/* 193 */ if (shouldStop()) return;
/* 194 */ }
/* 195 */
/* 196 */ agg_mapIter.close();
/* 197 */ if (agg_sorter == null) {
/* 198 */ agg_hashMap.free();
/* 199 */ }
/* 200 */
/* 201 */ }
/* 202 */
/* 203 */ protected void processNext() throws java.io.IOException {
/* 204 */ if (sort_needToSort) {
/* 205 */ long sort_spillSizeBefore = sort_metrics.memoryBytesSpilled();
/* 206 */ sort_addToSorter();
/* 207 */ sort_sortedIter = sort_sorter.sort();
/* 208 */ sort_sortTime.add(sort_sorter.getSortTimeNanos() / 1000000);
/* 209 */ sort_peakMemory.add(sort_sorter.getPeakMemoryUsage());
/* 210 */ sort_spillSize.add(sort_metrics.memoryBytesSpilled() -
sort_spillSizeBefore);
/* 211 */
sort_metrics.incPeakExecutionMemory(sort_sorter.getPeakMemoryUsage());
/* 212 */ sort_needToSort = false;
/* 213 */ }
/* 214 */
/* 215 */ while (sort_sortedIter.hasNext()) {
/* 216 */ UnsafeRow sort_outputRow = (UnsafeRow)sort_sortedIter.next();
/* 217 */
/* 218 */ append(sort_outputRow);
/* 219 */
/* 220 */ if (shouldStop()) return;
/* 221 */ }
/* 222 */ }
/* 223 */ }
== Subtree 5 / 5 ==
*Sort [Origin#16 ASC, UniqueCarrier#8 ASC], false, 0
+- *HashAggregate(key=[Origin#16,UniqueCarrier#8], functions=[count(1)],
output=[Origin#16,UniqueCarrier#8,count#134L])
+- Exchange hashpartitioning(Origin#16, UniqueCarrier#8, 200)
+- *HashAggregate(key=[Origin#16,UniqueCarrier#8],
functions=[partial_count(1)], output=[Origin#16,UniqueCarrier#8,count#296L])
+- *Project [UniqueCarrier#8, Origin#16]
+- *Filter (((((((isnotnull(Origin#16) &&
isnotnull(UniqueCarrier#8)) && isnotnull(Cancelled#21)) &&
isnotnull(CancellationCode#22)) && NOT (Cancelled#21 = 0)) &&
(CancellationCode#22 = A)) && isnotnull(Dest#17)) && (Dest#17 = ORD))
+- *Scan csv
[UniqueCarrier#8,Origin#16,Dest#17,Cancelled#21,CancellationCode#22] Format:
CSV, InputPaths: file:/home/robbins/brandberry/2008.csv, PushedFilters:
[IsNotNull(Origin), IsNotNull(UniqueCarrier), IsNotNull(Cancelled),
IsNotNull(CancellationCode), ..., ReadSchema:
struct<UniqueCarrier:string,Origin:string,Dest:string,Cancelled:int,CancellationCode:string>
Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIterator extends
org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */ private Object[] references;
/* 007 */ private boolean sort_needToSort;
/* 008 */ private org.apache.spark.sql.execution.SortExec sort_plan;
/* 009 */ private org.apache.spark.sql.execution.UnsafeExternalRowSorter
sort_sorter;
/* 010 */ private org.apache.spark.executor.TaskMetrics sort_metrics;
/* 011 */ private scala.collection.Iterator<UnsafeRow> sort_sortedIter;
/* 012 */ private boolean agg_initAgg;
/* 013 */ private boolean agg_bufIsNull;
/* 014 */ private long agg_bufValue;
/* 015 */ private org.apache.spark.sql.execution.aggregate.HashAggregateExec
agg_plan;
/* 016 */ private
org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap agg_hashMap;
/* 017 */ private org.apache.spark.sql.execution.UnsafeKVExternalSorter
agg_sorter;
/* 018 */ private org.apache.spark.unsafe.KVIterator agg_mapIter;
/* 019 */ private org.apache.spark.sql.execution.metric.SQLMetric
agg_peakMemory;
/* 020 */ private org.apache.spark.sql.execution.metric.SQLMetric
agg_spillSize;
/* 021 */ private scala.collection.Iterator inputadapter_input;
/* 022 */ private UnsafeRow agg_result;
/* 023 */ private
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder;
/* 024 */ private
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter;
/* 025 */ private UnsafeRow agg_result1;
/* 026 */ private
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder1;
/* 027 */ private
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
agg_rowWriter1;
/* 028 */ private org.apache.spark.sql.execution.metric.SQLMetric
sort_numOutputRows;
/* 029 */ private org.apache.spark.sql.execution.metric.SQLMetric
sort_aggTime;
/* 030 */ private org.apache.spark.sql.execution.metric.SQLMetric
sort_peakMemory;
/* 031 */ private org.apache.spark.sql.execution.metric.SQLMetric
sort_spillSize;
/* 032 */ private org.apache.spark.sql.execution.metric.SQLMetric
sort_sortTime;
/* 033 */
/* 034 */ public GeneratedIterator(Object[] references) {
/* 035 */ this.references = references;
/* 036 */ }
/* 037 */
/* 038 */ public void init(int index, scala.collection.Iterator inputs[]) {
/* 039 */ partitionIndex = index;
/* 040 */ sort_needToSort = true;
/* 041 */ this.sort_plan = (org.apache.spark.sql.execution.SortExec)
references[0];
/* 042 */ sort_sorter = sort_plan.createSorter();
/* 043 */ sort_metrics = org.apache.spark.TaskContext.get().taskMetrics();
/* 044 */
/* 045 */ agg_initAgg = false;
/* 046 */
/* 047 */ this.agg_plan =
(org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[1];
/* 048 */
/* 049 */ this.agg_peakMemory =
(org.apache.spark.sql.execution.metric.SQLMetric) references[2];
/* 050 */ this.agg_spillSize =
(org.apache.spark.sql.execution.metric.SQLMetric) references[3];
/* 051 */ inputadapter_input = inputs[0];
/* 052 */ agg_result = new UnsafeRow(2);
/* 053 */ this.agg_holder = new
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result, 64);
/* 054 */ this.agg_rowWriter = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder,
2);
/* 055 */ agg_result1 = new UnsafeRow(3);
/* 056 */ this.agg_holder1 = new
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result1, 64);
/* 057 */ this.agg_rowWriter1 = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder1,
3);
/* 058 */ this.sort_numOutputRows =
(org.apache.spark.sql.execution.metric.SQLMetric) references[4];
/* 059 */ this.sort_aggTime =
(org.apache.spark.sql.execution.metric.SQLMetric) references[5];
/* 060 */ this.sort_peakMemory =
(org.apache.spark.sql.execution.metric.SQLMetric) references[6];
/* 061 */ this.sort_spillSize =
(org.apache.spark.sql.execution.metric.SQLMetric) references[7];
/* 062 */ this.sort_sortTime =
(org.apache.spark.sql.execution.metric.SQLMetric) references[8];
/* 063 */ }
/* 064 */
/* 065 */ private void agg_doAggregateWithKeys() throws java.io.IOException {
/* 066 */ agg_hashMap = agg_plan.createHashMap();
/* 067 */
/* 068 */ while (inputadapter_input.hasNext()) {
/* 069 */ InternalRow inputadapter_row = (InternalRow)
inputadapter_input.next();
/* 070 */ boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 071 */ UTF8String inputadapter_value = inputadapter_isNull ? null :
(inputadapter_row.getUTF8String(0));
/* 072 */ boolean inputadapter_isNull1 = inputadapter_row.isNullAt(1);
/* 073 */ UTF8String inputadapter_value1 = inputadapter_isNull1 ? null :
(inputadapter_row.getUTF8String(1));
/* 074 */ long inputadapter_value2 = inputadapter_row.getLong(2);
/* 075 */
/* 076 */ UnsafeRow agg_unsafeRowAggBuffer = null;
/* 077 */ org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row
agg_vectorizedAggBuffer = null;
/* 078 */
/* 079 */ if (agg_vectorizedAggBuffer == null) {
/* 080 */ // generate grouping key
/* 081 */ agg_holder.reset();
/* 082 */
/* 083 */ agg_rowWriter.zeroOutNullBytes();
/* 084 */
/* 085 */ if (inputadapter_isNull) {
/* 086 */ agg_rowWriter.setNullAt(0);
/* 087 */ } else {
/* 088 */ agg_rowWriter.write(0, inputadapter_value);
/* 089 */ }
/* 090 */
/* 091 */ if (inputadapter_isNull1) {
/* 092 */ agg_rowWriter.setNullAt(1);
/* 093 */ } else {
/* 094 */ agg_rowWriter.write(1, inputadapter_value1);
/* 095 */ }
/* 096 */ agg_result.setTotalSize(agg_holder.totalSize());
/* 097 */ int agg_value6 = 42;
/* 098 */
/* 099 */ if (!inputadapter_isNull) {
/* 100 */ agg_value6 =
org.apache.spark.unsafe.hash.Murmur3_x86_32.hashUnsafeBytes(inputadapter_value.getBaseObject(),
inputadapter_value.getBaseOffset(), inputadapter_value.numBytes(), agg_value6);
/* 101 */ }
/* 102 */
/* 103 */ if (!inputadapter_isNull1) {
/* 104 */ agg_value6 =
org.apache.spark.unsafe.hash.Murmur3_x86_32.hashUnsafeBytes(inputadapter_value1.getBaseObject(),
inputadapter_value1.getBaseOffset(), inputadapter_value1.numBytes(),
agg_value6);
/* 105 */ }
/* 106 */ if (true) {
/* 107 */ // try to get the buffer from hash map
/* 108 */ agg_unsafeRowAggBuffer =
/* 109 */ agg_hashMap.getAggregationBufferFromUnsafeRow(agg_result,
agg_value6);
/* 110 */ }
/* 111 */ if (agg_unsafeRowAggBuffer == null) {
/* 112 */ if (agg_sorter == null) {
/* 113 */ agg_sorter =
agg_hashMap.destructAndCreateExternalSorter();
/* 114 */ } else {
/* 115 */
agg_sorter.merge(agg_hashMap.destructAndCreateExternalSorter());
/* 116 */ }
/* 117 */
/* 118 */ // the hash map had be spilled, it should have enough
memory now,
/* 119 */ // try to allocate buffer again.
/* 120 */ agg_unsafeRowAggBuffer =
/* 121 */ agg_hashMap.getAggregationBufferFromUnsafeRow(agg_result,
agg_value6);
/* 122 */ if (agg_unsafeRowAggBuffer == null) {
/* 123 */ // failed to allocate the first page
/* 124 */ throw new OutOfMemoryError("No enough memory for
aggregation");
/* 125 */ }
/* 126 */ }
/* 127 */ }
/* 128 */
/* 129 */ if (agg_vectorizedAggBuffer != null) {
/* 130 */ // update vectorized row
/* 131 */
/* 132 */ } else {
/* 133 */ // update unsafe row
/* 134 */
/* 135 */ // common sub-expressions
/* 136 */
/* 137 */ // evaluate aggregate function
/* 138 */ long agg_value10 = agg_unsafeRowAggBuffer.getLong(0);
/* 139 */
/* 140 */ long agg_value9 = -1L;
/* 141 */ agg_value9 = agg_value10 + inputadapter_value2;
/* 142 */ // update unsafe row buffer
/* 143 */ agg_unsafeRowAggBuffer.setLong(0, agg_value9);
/* 144 */
/* 145 */ }
/* 146 */ if (shouldStop()) return;
/* 147 */ }
/* 148 */
/* 149 */ agg_mapIter = agg_plan.finishAggregate(agg_hashMap, agg_sorter,
agg_peakMemory, agg_spillSize);
/* 150 */ }
/* 151 */
/* 152 */ private void sort_addToSorter() throws java.io.IOException {
/* 153 */ if (!agg_initAgg) {
/* 154 */ agg_initAgg = true;
/* 155 */ long sort_beforeAgg = System.nanoTime();
/* 156 */ agg_doAggregateWithKeys();
/* 157 */ sort_aggTime.add((System.nanoTime() - sort_beforeAgg) /
1000000);
/* 158 */ }
/* 159 */
/* 160 */ // output the result
/* 161 */
/* 162 */ while (agg_mapIter.next()) {
/* 163 */ sort_numOutputRows.add(1);
/* 164 */ UnsafeRow agg_aggKey = (UnsafeRow) agg_mapIter.getKey();
/* 165 */ UnsafeRow agg_aggBuffer = (UnsafeRow) agg_mapIter.getValue();
/* 166 */
/* 167 */ boolean agg_isNull11 = agg_aggKey.isNullAt(0);
/* 168 */ UTF8String agg_value12 = agg_isNull11 ? null :
(agg_aggKey.getUTF8String(0));
/* 169 */ boolean agg_isNull12 = agg_aggKey.isNullAt(1);
/* 170 */ UTF8String agg_value13 = agg_isNull12 ? null :
(agg_aggKey.getUTF8String(1));
/* 171 */ long agg_value14 = agg_aggBuffer.getLong(0);
/* 172 */
/* 173 */ agg_holder1.reset();
/* 174 */
/* 175 */ agg_rowWriter1.zeroOutNullBytes();
/* 176 */
/* 177 */ if (agg_isNull11) {
/* 178 */ agg_rowWriter1.setNullAt(0);
/* 179 */ } else {
/* 180 */ agg_rowWriter1.write(0, agg_value12);
/* 181 */ }
/* 182 */
/* 183 */ if (agg_isNull12) {
/* 184 */ agg_rowWriter1.setNullAt(1);
/* 185 */ } else {
/* 186 */ agg_rowWriter1.write(1, agg_value13);
/* 187 */ }
/* 188 */
/* 189 */ agg_rowWriter1.write(2, agg_value14);
/* 190 */ agg_result1.setTotalSize(agg_holder1.totalSize());
/* 191 */ sort_sorter.insertRow((UnsafeRow)agg_result1);
/* 192 */
/* 193 */ if (shouldStop()) return;
/* 194 */ }
/* 195 */
/* 196 */ agg_mapIter.close();
/* 197 */ if (agg_sorter == null) {
/* 198 */ agg_hashMap.free();
/* 199 */ }
/* 200 */
/* 201 */ }
/* 202 */
/* 203 */ protected void processNext() throws java.io.IOException {
/* 204 */ if (sort_needToSort) {
/* 205 */ long sort_spillSizeBefore = sort_metrics.memoryBytesSpilled();
/* 206 */ sort_addToSorter();
/* 207 */ sort_sortedIter = sort_sorter.sort();
/* 208 */ sort_sortTime.add(sort_sorter.getSortTimeNanos() / 1000000);
/* 209 */ sort_peakMemory.add(sort_sorter.getPeakMemoryUsage());
/* 210 */ sort_spillSize.add(sort_metrics.memoryBytesSpilled() -
sort_spillSizeBefore);
/* 211 */
sort_metrics.incPeakExecutionMemory(sort_sorter.getPeakMemoryUsage());
/* 212 */ sort_needToSort = false;
/* 213 */ }
/* 214 */
/* 215 */ while (sort_sortedIter.hasNext()) {
/* 216 */ UnsafeRow sort_outputRow = (UnsafeRow)sort_sortedIter.next();
/* 217 */
/* 218 */ append(sort_outputRow);
/* 219 */
/* 220 */ if (shouldStop()) return;
/* 221 */ }
/* 222 */ }
/* 223 */ }
{code}
> segmentation violation in o.a.s.unsafe.types.UTF8String
> --------------------------------------------------------
>
> Key: SPARK-15822
> URL: https://issues.apache.org/jira/browse/SPARK-15822
> Project: Spark
> Issue Type: Bug
> Affects Versions: 2.0.0
> Environment: linux amd64
> openjdk version "1.8.0_91"
> OpenJDK Runtime Environment (build 1.8.0_91-b14)
> OpenJDK 64-Bit Server VM (build 25.91-b14, mixed mode)
> Reporter: Pete Robbins
> Assignee: Herman van Hovell
> Priority: Blocker
>
> Executors fail with segmentation violation while running application with
> spark.memory.offHeap.enabled true
> spark.memory.offHeap.size 512m
> Also now reproduced with
> spark.memory.offHeap.enabled false
> {noformat}
> #
> # A fatal error has been detected by the Java Runtime Environment:
> #
> # SIGSEGV (0xb) at pc=0x00007f4559b4d4bd, pid=14182, tid=139935319750400
> #
> # JRE version: OpenJDK Runtime Environment (8.0_91-b14) (build 1.8.0_91-b14)
> # Java VM: OpenJDK 64-Bit Server VM (25.91-b14 mixed mode linux-amd64
> compressed oops)
> # Problematic frame:
> # J 4816 C2
> org.apache.spark.unsafe.types.UTF8String.compareTo(Lorg/apache/spark/unsafe/types/UTF8String;)I
> (64 bytes) @ 0x00007f4559b4d4bd [0x00007f4559b4d460+0x5d]
> {noformat}
> We initially saw this on IBM java on PowerPC box but is recreatable on linux
> with OpenJDK. On linux with IBM Java 8 we see a null pointer exception at the
> same code point:
> {noformat}
> 16/06/08 11:14:58 ERROR Executor: Exception in task 1.0 in stage 5.0 (TID 48)
> java.lang.NullPointerException
> at
> org.apache.spark.unsafe.types.UTF8String.compareTo(UTF8String.java:831)
> at org.apache.spark.unsafe.types.UTF8String.compare(UTF8String.java:844)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.findNextInnerJoinRows$(Unknown
> Source)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
> Source)
> at
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$doExecute$2$$anon$2.hasNext(WholeStageCodegenExec.scala:377)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at
> scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:30)
> at org.spark_project.guava.collect.Ordering.leastOf(Ordering.java:664)
> at org.apache.spark.util.collection.Utils$.takeOrdered(Utils.scala:37)
> at
> org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1$$anonfun$30.apply(RDD.scala:1365)
> at
> org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1$$anonfun$30.apply(RDD.scala:1362)
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:757)
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:757)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> at org.apache.spark.scheduler.Task.run(Task.scala:85)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> at java.lang.Thread.run(Thread.java:785)
> {noformat}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]