Github user maropu commented on the issue:

    https://github.com/apache/incubator-hivemall/pull/37
  
    A codegen'd top-K join is as follows;
    ```
    Found 1 WholeStageCodegen subtrees.
    == Subtree 1 / 1 ==
    *ShuffledHashJoinTopK -1, [group#10], [group#27]
    :- Exchange hashpartitioning(group#10, 200)
    :  +- LocalTableScan [userId#9, group#10, x#11, y#12]
    +- Exchange hashpartitioning(group#27, 200)
       +- LocalTableScan [group#27, position#28, x#29, y#30]
    
    Generated code:
    /* 001 */ public Object generate(Object[] references) {
    /* 002 */   return new GeneratedIterator(references);
    /* 003 */ }
    /* 004 */
    /* 005 */ final class GeneratedIterator extends 
org.apache.spark.sql.execution.BufferedRowIterator {
    /* 006 */   private Object[] references;
    /* 007 */   private scala.collection.Iterator[] inputs;
    /* 008 */   private 
org.apache.spark.sql.execution.joins.ShuffledHashJoinTopKExec 
shuffledhashjointopk_topKJoin;
    /* 009 */   private org.apache.spark.sql.execution.joins.PriorityQueueShim 
shuffledhashjointopk_queue;
    /* 010 */   private scala.collection.Iterator shuffledhashjointopk_leftIter;
    /* 011 */   private InternalRow shuffledhashjointopk_leftRow;
    /* 012 */   private int shuffledhashjointopk_value;
    /* 013 */   private UTF8String shuffledhashjointopk_value1;
    /* 014 */   private boolean shuffledhashjointopk_isNull;
    /* 015 */   private double shuffledhashjointopk_value2;
    /* 016 */   private double shuffledhashjointopk_value3;
    /* 017 */   private int shuffledhashjointopk_value8;
    /* 018 */   private double shuffledhashjointopk_value9;
    /* 019 */   private 
org.apache.spark.sql.execution.joins.ShuffledHashJoinTopKExec 
shuffledhashjointopk_joinExec;
    /* 020 */   private org.apache.spark.sql.execution.joins.HashedRelation 
shuffledhashjointopk_relation;
    /* 021 */   private UnsafeRow shuffledhashjointopk_result;
    /* 022 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder 
shuffledhashjointopk_holder;
    /* 023 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter 
shuffledhashjointopk_rowWriter;
    /* 024 */   private org.apache.spark.sql.catalyst.expressions.JoinedRow 
shuffledhashjointopk_joinedRow;
    /* 025 */   private int shuffledhashjointopk_value23;
    /* 026 */   private boolean shuffledhashjointopk_isNull18;
    /* 027 */   private double shuffledhashjointopk_value24;
    /* 028 */   private boolean shuffledhashjointopk_isNull19;
    /* 029 */   private int shuffledhashjointopk_value25;
    /* 030 */   private UTF8String shuffledhashjointopk_value26;
    /* 031 */   private boolean shuffledhashjointopk_isNull20;
    /* 032 */   private double shuffledhashjointopk_value27;
    /* 033 */   private double shuffledhashjointopk_value28;
    /* 034 */   private UTF8String shuffledhashjointopk_value29;
    /* 035 */   private boolean shuffledhashjointopk_isNull21;
    /* 036 */   private UTF8String shuffledhashjointopk_value30;
    /* 037 */   private boolean shuffledhashjointopk_isNull22;
    /* 038 */   private double shuffledhashjointopk_value31;
    /* 039 */   private double shuffledhashjointopk_value32;
    /* 040 */   private org.apache.spark.sql.execution.metric.SQLMetric 
shuffledhashjointopk_numOutputRows;
    /* 041 */   private UnsafeRow shuffledhashjointopk_result1;
    /* 042 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder 
shuffledhashjointopk_holder1;
    /* 043 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter 
shuffledhashjointopk_rowWriter1;
    /* 044 */
    /* 045 */   public GeneratedIterator(Object[] references) {
    /* 046 */     this.references = references;  
    /* 047 */   }
    /* 048 */
    /* 049 */   public void init(int index, scala.collection.Iterator[] inputs) 
{
    /* 050 */     partitionIndex = index;
    /* 051 */     this.inputs = inputs;
    /* 052 */     wholestagecodegen_init_0();
    /* 053 */     wholestagecodegen_init_1();
    /* 054 */
    /* 055 */   }
    /* 056 */
    /* 057 */   private void wholestagecodegen_init_0() {
    /* 058 */     this.shuffledhashjointopk_topKJoin = 
(org.apache.spark.sql.execution.joins.ShuffledHashJoinTopKExec) references[0];
    /* 059 */     shuffledhashjointopk_queue = 
shuffledhashjointopk_topKJoin.priorityQueue();
    /* 060 */     shuffledhashjointopk_leftIter = inputs[0];
    /* 061 */
    /* 062 */     this.shuffledhashjointopk_joinExec = 
(org.apache.spark.sql.execution.joins.ShuffledHashJoinTopKExec) references[1];
    /* 063 */
    /* 064 */     shuffledhashjointopk_relation = 
(org.apache.spark.sql.execution.joins.HashedRelation) 
shuffledhashjointopk_joinExec.buildHashedRelation(inputs[1]);
    /* 065 */     
incPeakExecutionMemory(shuffledhashjointopk_relation.estimatedSize());
    /* 066 */
    /* 067 */     shuffledhashjointopk_result = new UnsafeRow(1);
    /* 068 */     this.shuffledhashjointopk_holder = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(shuffledhashjointopk_result,
 32);
    /* 069 */     this.shuffledhashjointopk_rowWriter = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(shuffledhashjointopk_holder,
 1);
    /* 070 */     shuffledhashjointopk_joinedRow = new 
org.apache.spark.sql.catalyst.expressions.JoinedRow();
    /* 071 */
    /* 072 */     this.shuffledhashjointopk_numOutputRows = 
(org.apache.spark.sql.execution.metric.SQLMetric) references[2];
    /* 073 */
    /* 074 */   }
    /* 075 */
    /* 076 */   private void wholestagecodegen_init_1() {
    /* 077 */     shuffledhashjointopk_result1 = new UnsafeRow(10);
    /* 078 */     this.shuffledhashjointopk_holder1 = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(shuffledhashjointopk_result1,
 96);
    /* 079 */     this.shuffledhashjointopk_rowWriter1 = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(shuffledhashjointopk_holder1,
 10);
    /* 080 */
    /* 081 */   }
    /* 082 */
    /* 083 */   protected void processNext() throws java.io.IOException {
    /* 084 */     shuffledhashjointopk_leftRow = null;
    /* 085 */     while (shuffledhashjointopk_leftIter.hasNext()) {
    /* 086 */       shuffledhashjointopk_leftRow = (InternalRow) 
shuffledhashjointopk_leftIter.next();
    /* 087 */
    /* 088 */       // Generate join key for stream side
    /* 089 */
    /* 090 */       shuffledhashjointopk_holder.reset();
    /* 091 */
    /* 092 */       shuffledhashjointopk_rowWriter.zeroOutNullBytes();
    /* 093 */
    /* 094 */       boolean shuffledhashjointopk_isNull5 = 
shuffledhashjointopk_leftRow.isNullAt(1);
    /* 095 */       UTF8String shuffledhashjointopk_value10 = 
shuffledhashjointopk_isNull5 ? null : 
(shuffledhashjointopk_leftRow.getUTF8String(1));
    /* 096 */       if (shuffledhashjointopk_isNull5) {
    /* 097 */         shuffledhashjointopk_rowWriter.setNullAt(0);
    /* 098 */       } else {
    /* 099 */         shuffledhashjointopk_rowWriter.write(0, 
shuffledhashjointopk_value10);
    /* 100 */       }
    /* 101 */       
shuffledhashjointopk_result.setTotalSize(shuffledhashjointopk_holder.totalSize());
    /* 102 */
    /* 103 */       // Find matches from HashedRelation
    /* 104 */       scala.collection.Iterator shuffledhashjointopk_matches = 
shuffledhashjointopk_result.anyNull()? null : 
(scala.collection.Iterator)shuffledhashjointopk_relation.get(shuffledhashjointopk_result);
    /* 105 */       if (shuffledhashjointopk_matches == null) continue;
    /* 106 */
    /* 107 */       // Join top-K right rows
    /* 108 */       while (shuffledhashjointopk_matches.hasNext()) {
    /* 109 */         shuffledhashjointopk_value = 
shuffledhashjointopk_leftRow.getInt(0);
    /* 110 */         shuffledhashjointopk_isNull = 
shuffledhashjointopk_leftRow.isNullAt(1);
    /* 111 */         shuffledhashjointopk_value1 = shuffledhashjointopk_isNull 
? null : (shuffledhashjointopk_leftRow.getUTF8String(1));
    /* 112 */         shuffledhashjointopk_value2 = 
shuffledhashjointopk_leftRow.getDouble(2);
    /* 113 */         shuffledhashjointopk_value3 = 
shuffledhashjointopk_leftRow.getDouble(3);
    /* 114 */         InternalRow shuffledhashjointopk_rightRow = (InternalRow) 
shuffledhashjointopk_matches.next();
    /* 115 */
    /* 116 */         InternalRow row = 
shuffledhashjointopk_joinedRow.apply(shuffledhashjointopk_leftRow, 
shuffledhashjointopk_rightRow);
    /* 117 */         // Compute a score for the `row`
    /* 118 */
    /* 118 */
    /* 119 */         boolean shuffledhashjointopk_isNull7 = false;
    /* 120 */
    /* 121 */         boolean shuffledhashjointopk_isNull8 = false;
    /* 122 */
    /* 123 */         boolean shuffledhashjointopk_isNull9 = false;
    /* 124 */
    /* 125 */         double shuffledhashjointopk_value15 = 
shuffledhashjointopk_joinedRow.getDouble(2);
    /* 126 */
    /* 127 */         double shuffledhashjointopk_value16 = 
shuffledhashjointopk_joinedRow.getDouble(6);
    /* 128 */         double shuffledhashjointopk_value14 = -1.0;
    /* 129 */         shuffledhashjointopk_value14 = 
shuffledhashjointopk_value15 - shuffledhashjointopk_value16;
    /* 130 */
    /* 131 */         double shuffledhashjointopk_value13 = -1.0;
    /* 132 */         shuffledhashjointopk_value13 = 
java.lang.Math.pow(shuffledhashjointopk_value14, 2.0D);
    /* 133 */
    /* 134 */         boolean shuffledhashjointopk_isNull13 = false;
    /* 135 */
    /* 136 */         boolean shuffledhashjointopk_isNull14 = false;
    /* 137 */
    /* 138 */         double shuffledhashjointopk_value20 = 
shuffledhashjointopk_joinedRow.getDouble(7);
    /* 139 */
    /* 140 */         double shuffledhashjointopk_value21 = 
shuffledhashjointopk_joinedRow.getDouble(7);
    /* 141 */         double shuffledhashjointopk_value19 = -1.0;
    /* 142 */         shuffledhashjointopk_value19 = 
shuffledhashjointopk_value20 - shuffledhashjointopk_value21;
    /* 143 */
    /* 144 */         double shuffledhashjointopk_value18 = -1.0;
    /* 145 */         shuffledhashjointopk_value18 = 
java.lang.Math.pow(shuffledhashjointopk_value19, 2.0D);
    /* 146 */         double shuffledhashjointopk_value12 = -1.0;
    /* 147 */         shuffledhashjointopk_value12 = 
shuffledhashjointopk_value13 + shuffledhashjointopk_value18;
    /* 148 */         boolean shuffledhashjointopk_isNull6 = false;
    /* 149 */         double shuffledhashjointopk_value11 = -1.0;
    /* 150 */
    /* 151 */         shuffledhashjointopk_value11 = 
java.lang.Math.sqrt(shuffledhashjointopk_value12);
    /* 152 */         
shuffledhashjointopk_queue.insert(shuffledhashjointopk_value11, row);
    /* 153 */       }
    /* 154 */
    /* 155 */       // Get top-K rows
    /* 156 */       scala.collection.Iterator shuffledhashjointopk_topKRows = 
shuffledhashjointopk_queue.get();
    /* 157 */       shuffledhashjointopk_queue.clear();
    /* 158 */
    /* 159 */       // Output top-K rows
    /* 160 */       while (shuffledhashjointopk_topKRows.hasNext()) {
    /* 161 */         InternalRow shuffledhashjointopk_resultRow = 
(InternalRow) shuffledhashjointopk_topKRows.next();
    /* 162 */         shuffledhashjointopk_numOutputRows.add(1);
    /* 163 */
    /* 164 */         shuffledhashjointopk_isNull18 = 
shuffledhashjointopk_resultRow.isNullAt(0);
    /* 165 */         shuffledhashjointopk_value23 = 
shuffledhashjointopk_isNull18 ? -1 : (shuffledhashjointopk_resultRow.getInt(0));
    /* 166 */         shuffledhashjointopk_isNull19 = 
shuffledhashjointopk_resultRow.isNullAt(1);
    /* 167 */         shuffledhashjointopk_value24 = 
shuffledhashjointopk_isNull19 ? -1.0 : 
(shuffledhashjointopk_resultRow.getDouble(1));
    /* 168 */         shuffledhashjointopk_value25 = 
shuffledhashjointopk_resultRow.getInt(2);
    /* 169 */         shuffledhashjointopk_isNull20 = 
shuffledhashjointopk_resultRow.isNullAt(3);
    /* 170 */         shuffledhashjointopk_value26 = 
shuffledhashjointopk_isNull20 ? null : 
(shuffledhashjointopk_resultRow.getUTF8String(3));
    /* 171 */         shuffledhashjointopk_value27 = 
shuffledhashjointopk_resultRow.getDouble(4);
    /* 172 */         shuffledhashjointopk_value28 = 
shuffledhashjointopk_resultRow.getDouble(5);
    /* 173 */         shuffledhashjointopk_isNull21 = 
shuffledhashjointopk_resultRow.isNullAt(6);
    /* 174 */         shuffledhashjointopk_value29 = 
shuffledhashjointopk_isNull21 ? null : 
(shuffledhashjointopk_resultRow.getUTF8String(6));
    /* 175 */         shuffledhashjointopk_isNull22 = 
shuffledhashjointopk_resultRow.isNullAt(7);
    /* 176 */         shuffledhashjointopk_value30 = 
shuffledhashjointopk_isNull22 ? null : 
(shuffledhashjointopk_resultRow.getUTF8String(7));
    /* 177 */         shuffledhashjointopk_value31 = 
shuffledhashjointopk_resultRow.getDouble(8);
    /* 178 */         shuffledhashjointopk_value32 = 
shuffledhashjointopk_resultRow.getDouble(9);
    /* 179 */         shuffledhashjointopk_holder1.reset();
    /* 180 */
    /* 181 */         shuffledhashjointopk_rowWriter1.zeroOutNullBytes();
    /* 182 */
    /* 182 */
    /* 183 */         if (shuffledhashjointopk_isNull18) {
    /* 184 */           shuffledhashjointopk_rowWriter1.setNullAt(0);
    /* 185 */         } else {
    /* 186 */           shuffledhashjointopk_rowWriter1.write(0, 
shuffledhashjointopk_value23);
    /* 187 */         }
    /* 188 */
    /* 189 */         if (shuffledhashjointopk_isNull19) {
    /* 190 */           shuffledhashjointopk_rowWriter1.setNullAt(1);
    /* 191 */         } else {
    /* 192 */           shuffledhashjointopk_rowWriter1.write(1, 
shuffledhashjointopk_value24);
    /* 193 */         }
    /* 194 */
    /* 195 */         shuffledhashjointopk_rowWriter1.write(2, 
shuffledhashjointopk_value25);
    /* 196 */
    /* 197 */         if (shuffledhashjointopk_isNull20) {
    /* 198 */           shuffledhashjointopk_rowWriter1.setNullAt(3);
    /* 199 */         } else {
    /* 200 */           shuffledhashjointopk_rowWriter1.write(3, 
shuffledhashjointopk_value26);
    /* 201 */         }
    /* 202 */
    /* 203 */         shuffledhashjointopk_rowWriter1.write(4, 
shuffledhashjointopk_value27);
    /* 204 */
    /* 205 */         shuffledhashjointopk_rowWriter1.write(5, 
shuffledhashjointopk_value28);
    /* 206 */
    /* 207 */         if (shuffledhashjointopk_isNull21) {
    /* 208 */           shuffledhashjointopk_rowWriter1.setNullAt(6);
    /* 209 */         } else {
    /* 210 */           shuffledhashjointopk_rowWriter1.write(6, 
shuffledhashjointopk_value29);
    /* 211 */         }
    /* 212 */
    /* 213 */         if (shuffledhashjointopk_isNull22) {
    /* 214 */           shuffledhashjointopk_rowWriter1.setNullAt(7);
    /* 215 */         } else {
    /* 216 */           shuffledhashjointopk_rowWriter1.write(7, 
shuffledhashjointopk_value30);
    /* 217 */         }
    /* 218 */
    /* 219 */         shuffledhashjointopk_rowWriter1.write(8, 
shuffledhashjointopk_value31);
    /* 220 */
    /* 221 */         shuffledhashjointopk_rowWriter1.write(9, 
shuffledhashjointopk_value32);
    /* 222 */         
shuffledhashjointopk_result1.setTotalSize(shuffledhashjointopk_holder1.totalSize());
    /* 223 */         append(shuffledhashjointopk_result1.copy());
    /* 224 */
    /* 225 */       }
    /* 226 */
    /* 227 */       if (shouldStop()) return;
    /* 228 */     }
    /* 229 */   }
    /* 230 */ }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to