Hi Liang-Chi, The snippet of code is below. If I bind the encoder early (the schema doesn't change throughout the execution), the final result is a list of the same entries.
@RequiredArgsConstructor public class UDAF extends UserDefinedAggregateFunction { // Do not resolve and bind this expression encoder eagerly private final ExpressionEncoder<Row> unboundedEncoder; private final StructType schema; @Override public StructType inputSchema() { return schema; } @Override public StructType bufferSchema() { return new UserDefineType(schema, unboundedEncoder); } @Override public DataType dataType() { return DataTypes.createArrayType(schema); } @Override public void initialize(MutableAggregationBuffer buffer) { buffer.update(0, new InternalRow[0]); } @Override public void update(MutableAggregationBuffer buffer, Row input) { UserDefineType data = buffer.getAs(0); data.add(unboundedEncoder.toRow(input)); buffer.update(0, data); } @Override public void merge(MutableAggregationBuffer buffer1, Row buffer2) { // merge buffer1.update(0, data1); } @Override public Object evaluate(Row buffer) { UserDefineType data = buffer.getAs(0); // need to return Row here instead of Internal Row return data.rows(); } static ExpressionEncoder<Row> resolveAndBind(ExpressionEncoder<Row> encoder) { val attributes = JavaConversions.asJavaCollection(encoder.schema().toAttributes()).stream().map(Attribute::toAttribute).collect(Collectors.toList()); return encoder.resolveAndBind(ScalaUtils.scalaSeq(attributes), SimpleAnalyzer$.MODULE$); } } // Wrap around a list of InternalRow class TopKDataType extends UserDefinedType<TopKDataType> { private final ExpressionEncoder<Row> unboundedEncoder; private final List<InternalRow> data; public Row[] rows() { val encoder = resolveAndBind(this.unboundedEncoder); return data.stream().map(encoder::fromRow).toArray(Row[]::new); } } ------- Regards, Andy On Fri, Jan 6, 2017 at 3:48 AM, Liang-Chi Hsieh <vii...@gmail.com> wrote: > > Can you show how you use the encoder in your UDAF? > > > Andy Dang wrote > > One more question about the behavior of ExpressionEncoder > > <Row> > > . > > > > I have a UDAF that has ExpressionEncoder > > <Row> > > as a member variable. > > > > However, if call resolveAndBind() eagerly on this encoder, it appears to > > break the UDAF. Bascially somehow the deserialized row are all the same > > during the merge step. Is it the expected behavior of Encoders? > > > > ------- > > Regards, > > Andy > > > > On Thu, Jan 5, 2017 at 10:55 AM, Andy Dang < > > > namd88@ > > > > wrote: > > > >> Perfect. The API in Java is bit clumsy though > >> > >> What I ended up doing in Java (the val is from lombok, if anyone's > >> wondering): > >> val attributes = JavaConversions.asJavaCollection(schema. > >> toAttributes()).stream().map(Attribute::toAttribute). > >> collect(Collectors.toList()); > >> val encoder = > >> RowEncoder.apply(schema).resolveAndBind(ScalaUtils. > scalaSeq(attributes), > >> SimpleAnalyzer$.MODULE$); > >> > >> > >> ------- > >> Regards, > >> Andy > >> > >> On Thu, Jan 5, 2017 at 2:53 AM, Liang-Chi Hsieh < > > > viirya@ > > > > wrote: > >> > >>> > >>> You need to resolve and bind the encoder. > >>> > >>> ExpressionEncoder > > <Row> > > enconder = RowEncoder.apply(struct).resol > >>> veAndBind(); > >>> > >>> > >>> Andy Dang wrote > >>> > Hi all, > >>> > (cc-ing dev since I've hit some developer API corner) > >>> > > >>> > What's the best way to convert an InternalRow to a Row if I've got an > >>> > InternalRow and the corresponding Schema. > >>> > > >>> > Code snippet: > >>> > @Test > >>> > public void foo() throws Exception { > >>> > Row row = RowFactory.create(1); > >>> > StructType struct = new StructType().add("id", > >>> > DataTypes.IntegerType); > >>> > ExpressionEncoder > >>> > > > <Row> > >>> > enconder = RowEncoder.apply(struct); > >>> > InternalRow internalRow = enconder.toRow(row); > >>> > System.out.println("Internal row size: " + > >>> > internalRow.numFields()); > >>> > Row roundTrip = enconder.fromRow(internalRow); > >>> > System.out.println("Round trip: " + roundTrip.size()); > >>> > } > >>> > > >>> > The code fails at the line encoder.fromRow() with the exception: > >>> >> Caused by: java.lang.UnsupportedOperationException: Cannot evaluate > >>> > expression: getcolumnbyordinal(0, IntegerType) > >>> > > >>> > ------- > >>> > Regards, > >>> > Andy > >>> > >>> > >>> > >>> > >>> > >>> ----- > >>> Liang-Chi Hsieh | @viirya > >>> Spark Technology Center > >>> http://www.spark.tc/ > >>> -- > >>> View this message in context: http://apache-spark-developers > >>> -list.1001551.n3.nabble.com/Converting-an-InternalRow-to- > >>> a-Row-tp20460p20465.html > >>> Sent from the Apache Spark Developers List mailing list archive at > >>> Nabble.com. > >>> > >>> --------------------------------------------------------------------- > >>> To unsubscribe e-mail: > > > dev-unsubscribe@.apache > > >>> > >>> > >> > > > > > > ----- > Liang-Chi Hsieh | @viirya > Spark Technology Center > http://www.spark.tc/ > -- > View this message in context: http://apache-spark- > developers-list.1001551.n3.nabble.com/Converting-an-InternalRow-to-a-Row- > tp20460p20487.html > Sent from the Apache Spark Developers List mailing list archive at > Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org > >