Hi Liang-Chi,

The snippet of code is below. If I bind the encoder early (the schema
doesn't change throughout the execution), the final result is a list of the
same entries.

@RequiredArgsConstructor
public class UDAF extends UserDefinedAggregateFunction {

    // Do not resolve and bind this expression encoder eagerly
    private final ExpressionEncoder<Row> unboundedEncoder;
    private final StructType schema;

    @Override
    public StructType inputSchema() {
        return schema;
    }

    @Override
    public StructType bufferSchema() {
        return new UserDefineType(schema, unboundedEncoder);
    }

    @Override
    public DataType dataType() {
        return DataTypes.createArrayType(schema);
    }

    @Override
    public void initialize(MutableAggregationBuffer buffer) {
        buffer.update(0, new InternalRow[0]);
    }

    @Override
    public void update(MutableAggregationBuffer buffer, Row input) {
        UserDefineType data = buffer.getAs(0);

        data.add(unboundedEncoder.toRow(input));

        buffer.update(0, data);
    }

    @Override
    public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
        // merge
        buffer1.update(0, data1);
    }

    @Override
    public Object evaluate(Row buffer) {
        UserDefineType data = buffer.getAs(0);

       // need to return Row here instead of Internal Row
        return data.rows();
    }

    static ExpressionEncoder<Row> resolveAndBind(ExpressionEncoder<Row>
encoder) {
        val attributes =
JavaConversions.asJavaCollection(encoder.schema().toAttributes()).stream().map(Attribute::toAttribute).collect(Collectors.toList());
        return encoder.resolveAndBind(ScalaUtils.scalaSeq(attributes),
SimpleAnalyzer$.MODULE$);
    }
}

// Wrap around a list of InternalRow
class TopKDataType extends UserDefinedType<TopKDataType> {
    private final ExpressionEncoder<Row> unboundedEncoder;
    private final List<InternalRow> data;

   public Row[] rows() {
        val encoder = resolveAndBind(this.unboundedEncoder);

        return data.stream().map(encoder::fromRow).toArray(Row[]::new);
    }
}

-------
Regards,
Andy

On Fri, Jan 6, 2017 at 3:48 AM, Liang-Chi Hsieh <vii...@gmail.com> wrote:

>
> Can you show how you use the encoder in your UDAF?
>
>
> Andy Dang wrote
> > One more question about the behavior of ExpressionEncoder
> > <Row>
> > .
> >
> > I have a UDAF that has ExpressionEncoder
> > <Row>
> >  as a member variable.
> >
> > However, if call resolveAndBind() eagerly on this encoder, it appears to
> > break the UDAF. Bascially somehow the deserialized row are all the same
> > during the merge step. Is it the expected behavior of Encoders?
> >
> > -------
> > Regards,
> > Andy
> >
> > On Thu, Jan 5, 2017 at 10:55 AM, Andy Dang &lt;
>
> > namd88@
>
> > &gt; wrote:
> >
> >> Perfect. The API in Java is bit clumsy though
> >>
> >> What I ended up doing in Java (the val is from lombok, if anyone's
> >> wondering):
> >>         val attributes = JavaConversions.asJavaCollection(schema.
> >> toAttributes()).stream().map(Attribute::toAttribute).
> >> collect(Collectors.toList());
> >>         val encoder =
> >> RowEncoder.apply(schema).resolveAndBind(ScalaUtils.
> scalaSeq(attributes),
> >> SimpleAnalyzer$.MODULE$);
> >>
> >>
> >> -------
> >> Regards,
> >> Andy
> >>
> >> On Thu, Jan 5, 2017 at 2:53 AM, Liang-Chi Hsieh &lt;
>
> > viirya@
>
> > &gt; wrote:
> >>
> >>>
> >>> You need to resolve and bind the encoder.
> >>>
> >>> ExpressionEncoder
> > <Row>
> >  enconder = RowEncoder.apply(struct).resol
> >>> veAndBind();
> >>>
> >>>
> >>> Andy Dang wrote
> >>> > Hi all,
> >>> > (cc-ing dev since I've hit some developer API corner)
> >>> >
> >>> > What's the best way to convert an InternalRow to a Row if I've got an
> >>> > InternalRow and the corresponding Schema.
> >>> >
> >>> > Code snippet:
> >>> >     @Test
> >>> >     public void foo() throws Exception {
> >>> >         Row row = RowFactory.create(1);
> >>> >         StructType struct = new StructType().add("id",
> >>> > DataTypes.IntegerType);
> >>> >         ExpressionEncoder
> >>> >
> > <Row>
> >>> >  enconder = RowEncoder.apply(struct);
> >>> >         InternalRow internalRow = enconder.toRow(row);
> >>> >         System.out.println("Internal row size: " +
> >>> > internalRow.numFields());
> >>> >         Row roundTrip = enconder.fromRow(internalRow);
> >>> >         System.out.println("Round trip: " + roundTrip.size());
> >>> >     }
> >>> >
> >>> > The code fails at the line encoder.fromRow() with the exception:
> >>> >> Caused by: java.lang.UnsupportedOperationException: Cannot evaluate
> >>> > expression: getcolumnbyordinal(0, IntegerType)
> >>> >
> >>> > -------
> >>> > Regards,
> >>> > Andy
> >>>
> >>>
> >>>
> >>>
> >>>
> >>> -----
> >>> Liang-Chi Hsieh | @viirya
> >>> Spark Technology Center
> >>> http://www.spark.tc/
> >>> --
> >>> View this message in context: http://apache-spark-developers
> >>> -list.1001551.n3.nabble.com/Converting-an-InternalRow-to-
> >>> a-Row-tp20460p20465.html
> >>> Sent from the Apache Spark Developers List mailing list archive at
> >>> Nabble.com.
> >>>
> >>> ---------------------------------------------------------------------
> >>> To unsubscribe e-mail:
>
> > dev-unsubscribe@.apache
>
> >>>
> >>>
> >>
>
>
>
>
>
> -----
> Liang-Chi Hsieh | @viirya
> Spark Technology Center
> http://www.spark.tc/
> --
> View this message in context: http://apache-spark-
> developers-list.1001551.n3.nabble.com/Converting-an-InternalRow-to-a-Row-
> tp20460p20487.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>

Reply via email to