Github user maropu commented on the issue: https://github.com/apache/incubator-hivemall/pull/37 A codegen'd top-K join is as follows; ``` Found 1 WholeStageCodegen subtrees. == Subtree 1 / 1 == *ShuffledHashJoinTopK -1, [group#10], [group#27] :- Exchange hashpartitioning(group#10, 200) : +- LocalTableScan [userId#9, group#10, x#11, y#12] +- Exchange hashpartitioning(group#27, 200) +- LocalTableScan [group#27, position#28, x#29, y#30] Generated code: /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIterator(references); /* 003 */ } /* 004 */ /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 006 */ private Object[] references; /* 007 */ private scala.collection.Iterator[] inputs; /* 008 */ private org.apache.spark.sql.execution.joins.ShuffledHashJoinTopKExec shuffledhashjointopk_topKJoin; /* 009 */ private org.apache.spark.sql.execution.joins.PriorityQueueShim shuffledhashjointopk_queue; /* 010 */ private scala.collection.Iterator shuffledhashjointopk_leftIter; /* 011 */ private InternalRow shuffledhashjointopk_leftRow; /* 012 */ private int shuffledhashjointopk_value; /* 013 */ private UTF8String shuffledhashjointopk_value1; /* 014 */ private boolean shuffledhashjointopk_isNull; /* 015 */ private double shuffledhashjointopk_value2; /* 016 */ private double shuffledhashjointopk_value3; /* 017 */ private int shuffledhashjointopk_value8; /* 018 */ private double shuffledhashjointopk_value9; /* 019 */ private org.apache.spark.sql.execution.joins.ShuffledHashJoinTopKExec shuffledhashjointopk_joinExec; /* 020 */ private org.apache.spark.sql.execution.joins.HashedRelation shuffledhashjointopk_relation; /* 021 */ private UnsafeRow shuffledhashjointopk_result; /* 022 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder shuffledhashjointopk_holder; /* 023 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter shuffledhashjointopk_rowWriter; /* 024 */ private org.apache.spark.sql.catalyst.expressions.JoinedRow shuffledhashjointopk_joinedRow; /* 025 */ private int shuffledhashjointopk_value23; /* 026 */ private boolean shuffledhashjointopk_isNull18; /* 027 */ private double shuffledhashjointopk_value24; /* 028 */ private boolean shuffledhashjointopk_isNull19; /* 029 */ private int shuffledhashjointopk_value25; /* 030 */ private UTF8String shuffledhashjointopk_value26; /* 031 */ private boolean shuffledhashjointopk_isNull20; /* 032 */ private double shuffledhashjointopk_value27; /* 033 */ private double shuffledhashjointopk_value28; /* 034 */ private UTF8String shuffledhashjointopk_value29; /* 035 */ private boolean shuffledhashjointopk_isNull21; /* 036 */ private UTF8String shuffledhashjointopk_value30; /* 037 */ private boolean shuffledhashjointopk_isNull22; /* 038 */ private double shuffledhashjointopk_value31; /* 039 */ private double shuffledhashjointopk_value32; /* 040 */ private org.apache.spark.sql.execution.metric.SQLMetric shuffledhashjointopk_numOutputRows; /* 041 */ private UnsafeRow shuffledhashjointopk_result1; /* 042 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder shuffledhashjointopk_holder1; /* 043 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter shuffledhashjointopk_rowWriter1; /* 044 */ /* 045 */ public GeneratedIterator(Object[] references) { /* 046 */ this.references = references; /* 047 */ } /* 048 */ /* 049 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 050 */ partitionIndex = index; /* 051 */ this.inputs = inputs; /* 052 */ wholestagecodegen_init_0(); /* 053 */ wholestagecodegen_init_1(); /* 054 */ /* 055 */ } /* 056 */ /* 057 */ private void wholestagecodegen_init_0() { /* 058 */ this.shuffledhashjointopk_topKJoin = (org.apache.spark.sql.execution.joins.ShuffledHashJoinTopKExec) references[0]; /* 059 */ shuffledhashjointopk_queue = shuffledhashjointopk_topKJoin.priorityQueue(); /* 060 */ shuffledhashjointopk_leftIter = inputs[0]; /* 061 */ /* 062 */ this.shuffledhashjointopk_joinExec = (org.apache.spark.sql.execution.joins.ShuffledHashJoinTopKExec) references[1]; /* 063 */ /* 064 */ shuffledhashjointopk_relation = (org.apache.spark.sql.execution.joins.HashedRelation) shuffledhashjointopk_joinExec.buildHashedRelation(inputs[1]); /* 065 */ incPeakExecutionMemory(shuffledhashjointopk_relation.estimatedSize()); /* 066 */ /* 067 */ shuffledhashjointopk_result = new UnsafeRow(1); /* 068 */ this.shuffledhashjointopk_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(shuffledhashjointopk_result, 32); /* 069 */ this.shuffledhashjointopk_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(shuffledhashjointopk_holder, 1); /* 070 */ shuffledhashjointopk_joinedRow = new org.apache.spark.sql.catalyst.expressions.JoinedRow(); /* 071 */ /* 072 */ this.shuffledhashjointopk_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[2]; /* 073 */ /* 074 */ } /* 075 */ /* 076 */ private void wholestagecodegen_init_1() { /* 077 */ shuffledhashjointopk_result1 = new UnsafeRow(10); /* 078 */ this.shuffledhashjointopk_holder1 = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(shuffledhashjointopk_result1, 96); /* 079 */ this.shuffledhashjointopk_rowWriter1 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(shuffledhashjointopk_holder1, 10); /* 080 */ /* 081 */ } /* 082 */ /* 083 */ protected void processNext() throws java.io.IOException { /* 084 */ shuffledhashjointopk_leftRow = null; /* 085 */ while (shuffledhashjointopk_leftIter.hasNext()) { /* 086 */ shuffledhashjointopk_leftRow = (InternalRow) shuffledhashjointopk_leftIter.next(); /* 087 */ /* 088 */ // Generate join key for stream side /* 089 */ /* 090 */ shuffledhashjointopk_holder.reset(); /* 091 */ /* 092 */ shuffledhashjointopk_rowWriter.zeroOutNullBytes(); /* 093 */ /* 094 */ boolean shuffledhashjointopk_isNull5 = shuffledhashjointopk_leftRow.isNullAt(1); /* 095 */ UTF8String shuffledhashjointopk_value10 = shuffledhashjointopk_isNull5 ? null : (shuffledhashjointopk_leftRow.getUTF8String(1)); /* 096 */ if (shuffledhashjointopk_isNull5) { /* 097 */ shuffledhashjointopk_rowWriter.setNullAt(0); /* 098 */ } else { /* 099 */ shuffledhashjointopk_rowWriter.write(0, shuffledhashjointopk_value10); /* 100 */ } /* 101 */ shuffledhashjointopk_result.setTotalSize(shuffledhashjointopk_holder.totalSize()); /* 102 */ /* 103 */ // Find matches from HashedRelation /* 104 */ scala.collection.Iterator shuffledhashjointopk_matches = shuffledhashjointopk_result.anyNull()? null : (scala.collection.Iterator)shuffledhashjointopk_relation.get(shuffledhashjointopk_result); /* 105 */ if (shuffledhashjointopk_matches == null) continue; /* 106 */ /* 107 */ // Join top-K right rows /* 108 */ while (shuffledhashjointopk_matches.hasNext()) { /* 109 */ shuffledhashjointopk_value = shuffledhashjointopk_leftRow.getInt(0); /* 110 */ shuffledhashjointopk_isNull = shuffledhashjointopk_leftRow.isNullAt(1); /* 111 */ shuffledhashjointopk_value1 = shuffledhashjointopk_isNull ? null : (shuffledhashjointopk_leftRow.getUTF8String(1)); /* 112 */ shuffledhashjointopk_value2 = shuffledhashjointopk_leftRow.getDouble(2); /* 113 */ shuffledhashjointopk_value3 = shuffledhashjointopk_leftRow.getDouble(3); /* 114 */ InternalRow shuffledhashjointopk_rightRow = (InternalRow) shuffledhashjointopk_matches.next(); /* 115 */ /* 116 */ InternalRow row = shuffledhashjointopk_joinedRow.apply(shuffledhashjointopk_leftRow, shuffledhashjointopk_rightRow); /* 117 */ // Compute a score for the `row` /* 118 */ /* 118 */ /* 119 */ boolean shuffledhashjointopk_isNull7 = false; /* 120 */ /* 121 */ boolean shuffledhashjointopk_isNull8 = false; /* 122 */ /* 123 */ boolean shuffledhashjointopk_isNull9 = false; /* 124 */ /* 125 */ double shuffledhashjointopk_value15 = shuffledhashjointopk_joinedRow.getDouble(2); /* 126 */ /* 127 */ double shuffledhashjointopk_value16 = shuffledhashjointopk_joinedRow.getDouble(6); /* 128 */ double shuffledhashjointopk_value14 = -1.0; /* 129 */ shuffledhashjointopk_value14 = shuffledhashjointopk_value15 - shuffledhashjointopk_value16; /* 130 */ /* 131 */ double shuffledhashjointopk_value13 = -1.0; /* 132 */ shuffledhashjointopk_value13 = java.lang.Math.pow(shuffledhashjointopk_value14, 2.0D); /* 133 */ /* 134 */ boolean shuffledhashjointopk_isNull13 = false; /* 135 */ /* 136 */ boolean shuffledhashjointopk_isNull14 = false; /* 137 */ /* 138 */ double shuffledhashjointopk_value20 = shuffledhashjointopk_joinedRow.getDouble(7); /* 139 */ /* 140 */ double shuffledhashjointopk_value21 = shuffledhashjointopk_joinedRow.getDouble(7); /* 141 */ double shuffledhashjointopk_value19 = -1.0; /* 142 */ shuffledhashjointopk_value19 = shuffledhashjointopk_value20 - shuffledhashjointopk_value21; /* 143 */ /* 144 */ double shuffledhashjointopk_value18 = -1.0; /* 145 */ shuffledhashjointopk_value18 = java.lang.Math.pow(shuffledhashjointopk_value19, 2.0D); /* 146 */ double shuffledhashjointopk_value12 = -1.0; /* 147 */ shuffledhashjointopk_value12 = shuffledhashjointopk_value13 + shuffledhashjointopk_value18; /* 148 */ boolean shuffledhashjointopk_isNull6 = false; /* 149 */ double shuffledhashjointopk_value11 = -1.0; /* 150 */ /* 151 */ shuffledhashjointopk_value11 = java.lang.Math.sqrt(shuffledhashjointopk_value12); /* 152 */ shuffledhashjointopk_queue.insert(shuffledhashjointopk_value11, row); /* 153 */ } /* 154 */ /* 155 */ // Get top-K rows /* 156 */ scala.collection.Iterator shuffledhashjointopk_topKRows = shuffledhashjointopk_queue.get(); /* 157 */ shuffledhashjointopk_queue.clear(); /* 158 */ /* 159 */ // Output top-K rows /* 160 */ while (shuffledhashjointopk_topKRows.hasNext()) { /* 161 */ InternalRow shuffledhashjointopk_resultRow = (InternalRow) shuffledhashjointopk_topKRows.next(); /* 162 */ shuffledhashjointopk_numOutputRows.add(1); /* 163 */ /* 164 */ shuffledhashjointopk_isNull18 = shuffledhashjointopk_resultRow.isNullAt(0); /* 165 */ shuffledhashjointopk_value23 = shuffledhashjointopk_isNull18 ? -1 : (shuffledhashjointopk_resultRow.getInt(0)); /* 166 */ shuffledhashjointopk_isNull19 = shuffledhashjointopk_resultRow.isNullAt(1); /* 167 */ shuffledhashjointopk_value24 = shuffledhashjointopk_isNull19 ? -1.0 : (shuffledhashjointopk_resultRow.getDouble(1)); /* 168 */ shuffledhashjointopk_value25 = shuffledhashjointopk_resultRow.getInt(2); /* 169 */ shuffledhashjointopk_isNull20 = shuffledhashjointopk_resultRow.isNullAt(3); /* 170 */ shuffledhashjointopk_value26 = shuffledhashjointopk_isNull20 ? null : (shuffledhashjointopk_resultRow.getUTF8String(3)); /* 171 */ shuffledhashjointopk_value27 = shuffledhashjointopk_resultRow.getDouble(4); /* 172 */ shuffledhashjointopk_value28 = shuffledhashjointopk_resultRow.getDouble(5); /* 173 */ shuffledhashjointopk_isNull21 = shuffledhashjointopk_resultRow.isNullAt(6); /* 174 */ shuffledhashjointopk_value29 = shuffledhashjointopk_isNull21 ? null : (shuffledhashjointopk_resultRow.getUTF8String(6)); /* 175 */ shuffledhashjointopk_isNull22 = shuffledhashjointopk_resultRow.isNullAt(7); /* 176 */ shuffledhashjointopk_value30 = shuffledhashjointopk_isNull22 ? null : (shuffledhashjointopk_resultRow.getUTF8String(7)); /* 177 */ shuffledhashjointopk_value31 = shuffledhashjointopk_resultRow.getDouble(8); /* 178 */ shuffledhashjointopk_value32 = shuffledhashjointopk_resultRow.getDouble(9); /* 179 */ shuffledhashjointopk_holder1.reset(); /* 180 */ /* 181 */ shuffledhashjointopk_rowWriter1.zeroOutNullBytes(); /* 182 */ /* 182 */ /* 183 */ if (shuffledhashjointopk_isNull18) { /* 184 */ shuffledhashjointopk_rowWriter1.setNullAt(0); /* 185 */ } else { /* 186 */ shuffledhashjointopk_rowWriter1.write(0, shuffledhashjointopk_value23); /* 187 */ } /* 188 */ /* 189 */ if (shuffledhashjointopk_isNull19) { /* 190 */ shuffledhashjointopk_rowWriter1.setNullAt(1); /* 191 */ } else { /* 192 */ shuffledhashjointopk_rowWriter1.write(1, shuffledhashjointopk_value24); /* 193 */ } /* 194 */ /* 195 */ shuffledhashjointopk_rowWriter1.write(2, shuffledhashjointopk_value25); /* 196 */ /* 197 */ if (shuffledhashjointopk_isNull20) { /* 198 */ shuffledhashjointopk_rowWriter1.setNullAt(3); /* 199 */ } else { /* 200 */ shuffledhashjointopk_rowWriter1.write(3, shuffledhashjointopk_value26); /* 201 */ } /* 202 */ /* 203 */ shuffledhashjointopk_rowWriter1.write(4, shuffledhashjointopk_value27); /* 204 */ /* 205 */ shuffledhashjointopk_rowWriter1.write(5, shuffledhashjointopk_value28); /* 206 */ /* 207 */ if (shuffledhashjointopk_isNull21) { /* 208 */ shuffledhashjointopk_rowWriter1.setNullAt(6); /* 209 */ } else { /* 210 */ shuffledhashjointopk_rowWriter1.write(6, shuffledhashjointopk_value29); /* 211 */ } /* 212 */ /* 213 */ if (shuffledhashjointopk_isNull22) { /* 214 */ shuffledhashjointopk_rowWriter1.setNullAt(7); /* 215 */ } else { /* 216 */ shuffledhashjointopk_rowWriter1.write(7, shuffledhashjointopk_value30); /* 217 */ } /* 218 */ /* 219 */ shuffledhashjointopk_rowWriter1.write(8, shuffledhashjointopk_value31); /* 220 */ /* 221 */ shuffledhashjointopk_rowWriter1.write(9, shuffledhashjointopk_value32); /* 222 */ shuffledhashjointopk_result1.setTotalSize(shuffledhashjointopk_holder1.totalSize()); /* 223 */ append(shuffledhashjointopk_result1.copy()); /* 224 */ /* 225 */ } /* 226 */ /* 227 */ if (shouldStop()) return; /* 228 */ } /* 229 */ } /* 230 */ } ```
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---