Hi Andy,

Thanks for sharing the code snippet.

I am not sure if you miss something in the snippet, because some function
signature are not matched, e.g.,

    @Override
    public StructType bufferSchema() {
        return new UserDefineType(schema, unboundedEncoder);
    }


Maybe you define a class UserDefineType which extends StructType.

Anyway, I noticed that in this line:

        data.add(unboundedEncoder.toRow(input));

If you read the comment of "toRow", you will find it says:

Note that multiple calls to toRow are allowed to return the same actual
[[InternalRow]] object.  Thus, the caller should copy the result before
making another call if required.

I think it is why you get a list of the same entries.

So you may need to change it to:

        data.add(unboundedEncoder.toRow(input).copy());



Andy Dang wrote
> Hi Liang-Chi,
> 
> The snippet of code is below. If I bind the encoder early (the schema
> doesn't change throughout the execution), the final result is a list of
> the
> same entries.
> 
> @RequiredArgsConstructor
> public class UDAF extends UserDefinedAggregateFunction {
> 
>     // Do not resolve and bind this expression encoder eagerly
>     private final ExpressionEncoder
> <Row>
>  unboundedEncoder;
>     private final StructType schema;
> 
>     @Override
>     public StructType inputSchema() {
>         return schema;
>     }
> 
>     @Override
>     public StructType bufferSchema() {
>         return new UserDefineType(schema, unboundedEncoder);
>     }
> 
>     @Override
>     public DataType dataType() {
>         return DataTypes.createArrayType(schema);
>     }
> 
>     @Override
>     public void initialize(MutableAggregationBuffer buffer) {
>         buffer.update(0, new InternalRow[0]);
>     }
> 
>     @Override
>     public void update(MutableAggregationBuffer buffer, Row input) {
>         UserDefineType data = buffer.getAs(0);
> 
>         data.add(unboundedEncoder.toRow(input));
> 
>         buffer.update(0, data);
>     }
> 
>     @Override
>     public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
>         // merge
>         buffer1.update(0, data1);
>     }
> 
>     @Override
>     public Object evaluate(Row buffer) {
>         UserDefineType data = buffer.getAs(0);
> 
>        // need to return Row here instead of Internal Row
>         return data.rows();
>     }
> 
>     static ExpressionEncoder
> <Row>
>  resolveAndBind(ExpressionEncoder
> <Row>
> encoder) {
>         val attributes =
> JavaConversions.asJavaCollection(encoder.schema().toAttributes()).stream().map(Attribute::toAttribute).collect(Collectors.toList());
>         return encoder.resolveAndBind(ScalaUtils.scalaSeq(attributes),
> SimpleAnalyzer$.MODULE$);
>     }
> }
> 
> // Wrap around a list of InternalRow
> class TopKDataType extends UserDefinedType
> <TopKDataType>
>  {
>     private final ExpressionEncoder
> <Row>
>  unboundedEncoder;
>     private final List
> <InternalRow>
>  data;
> 
>    public Row[] rows() {
>         val encoder = resolveAndBind(this.unboundedEncoder);
> 
>         return data.stream().map(encoder::fromRow).toArray(Row[]::new);
>     }
> }
> 
> -------
> Regards,
> Andy
> 
> On Fri, Jan 6, 2017 at 3:48 AM, Liang-Chi Hsieh &lt;

> viirya@

> &gt; wrote:
> 
>>
>> Can you show how you use the encoder in your UDAF?
>>
>>
>> Andy Dang wrote
>> > One more question about the behavior of ExpressionEncoder
>> > 
> <Row>
>> > .
>> >
>> > I have a UDAF that has ExpressionEncoder
>> > 
> <Row>
>> >  as a member variable.
>> >
>> > However, if call resolveAndBind() eagerly on this encoder, it appears
>> to
>> > break the UDAF. Bascially somehow the deserialized row are all the same
>> > during the merge step. Is it the expected behavior of Encoders?
>> >
>> > -------
>> > Regards,
>> > Andy
>> >
>> > On Thu, Jan 5, 2017 at 10:55 AM, Andy Dang &lt;
>>
>> > namd88@
>>
>> > &gt; wrote:
>> >
>> >> Perfect. The API in Java is bit clumsy though
>> >>
>> >> What I ended up doing in Java (the val is from lombok, if anyone's
>> >> wondering):
>> >>         val attributes = JavaConversions.asJavaCollection(schema.
>> >> toAttributes()).stream().map(Attribute::toAttribute).
>> >> collect(Collectors.toList());
>> >>         val encoder =
>> >> RowEncoder.apply(schema).resolveAndBind(ScalaUtils.
>> scalaSeq(attributes),
>> >> SimpleAnalyzer$.MODULE$);
>> >>
>> >>
>> >> -------
>> >> Regards,
>> >> Andy
>> >>
>> >> On Thu, Jan 5, 2017 at 2:53 AM, Liang-Chi Hsieh &lt;
>>
>> > viirya@
>>
>> > &gt; wrote:
>> >>
>> >>>
>> >>> You need to resolve and bind the encoder.
>> >>>
>> >>> ExpressionEncoder
>> > 
> <Row>
>> >  enconder = RowEncoder.apply(struct).resol
>> >>> veAndBind();
>> >>>
>> >>>
>> >>> Andy Dang wrote
>> >>> > Hi all,
>> >>> > (cc-ing dev since I've hit some developer API corner)
>> >>> >
>> >>> > What's the best way to convert an InternalRow to a Row if I've got
>> an
>> >>> > InternalRow and the corresponding Schema.
>> >>> >
>> >>> > Code snippet:
>> >>> >     @Test
>> >>> >     public void foo() throws Exception {
>> >>> >         Row row = RowFactory.create(1);
>> >>> >         StructType struct = new StructType().add("id",
>> >>> > DataTypes.IntegerType);
>> >>> >         ExpressionEncoder
>> >>> >
>> > 
> <Row>
>> >>> >  enconder = RowEncoder.apply(struct);
>> >>> >         InternalRow internalRow = enconder.toRow(row);
>> >>> >         System.out.println("Internal row size: " +
>> >>> > internalRow.numFields());
>> >>> >         Row roundTrip = enconder.fromRow(internalRow);
>> >>> >         System.out.println("Round trip: " + roundTrip.size());
>> >>> >     }
>> >>> >
>> >>> > The code fails at the line encoder.fromRow() with the exception:
>> >>> >> Caused by: java.lang.UnsupportedOperationException: Cannot
>> evaluate
>> >>> > expression: getcolumnbyordinal(0, IntegerType)
>> >>> >
>> >>> > -------
>> >>> > Regards,
>> >>> > Andy
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> -----
>> >>> Liang-Chi Hsieh | @viirya
>> >>> Spark Technology Center
>> >>> http://www.spark.tc/
>> >>> --
>> >>> View this message in context: http://apache-spark-developers
>> >>> -list.1001551.n3.nabble.com/Converting-an-InternalRow-to-
>> >>> a-Row-tp20460p20465.html
>> >>> Sent from the Apache Spark Developers List mailing list archive at
>> >>> Nabble.com.
>> >>>
>> >>> ---------------------------------------------------------------------
>> >>> To unsubscribe e-mail:
>>
>> > dev-unsubscribe@.apache
>>
>> >>>
>> >>>
>> >>
>>
>>
>>
>>
>>
>> -----
>> Liang-Chi Hsieh | @viirya
>> Spark Technology Center
>> http://www.spark.tc/
>> --
>> View this message in context: http://apache-spark-
>> developers-list.1001551.n3.nabble.com/Converting-an-InternalRow-to-a-Row-
>> tp20460p20487.html
>> Sent from the Apache Spark Developers List mailing list archive at
>> Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: 

> dev-unsubscribe@.apache

>>
>>





-----
Liang-Chi Hsieh | @viirya 
Spark Technology Center 
http://www.spark.tc/ 
--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Converting-an-InternalRow-to-a-Row-tp20460p20506.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org

Reply via email to