luke de feo created BEAM-12255:
----------------------------------

             Summary: Group.aggregateField does not work with combine fns that 
return list[T]
                 Key: BEAM-12255
                 URL: https://issues.apache.org/jira/browse/BEAM-12255
             Project: Beam
          Issue Type: Bug
          Components: beam-model
    Affects Versions: 2.28.0
            Reporter: luke de feo


I am trying to use the group api to apply multiple aggregations to a Row. When 
i add a combine fn where the output is a list[T] the job blows up at run time.

 

Couldn't find field type for Tjava.lang.RuntimeException: Couldn't find field 
type for T at 
org.apache.beam.sdk.schemas.FieldTypeDescriptors.fieldTypeForJavaType(FieldTypeDescriptors.java:98)
 at 
org.apache.beam.sdk.schemas.FieldTypeDescriptors.getArrayFieldType(FieldTypeDescriptors.java:118)
 at 
org.apache.beam.sdk.schemas.FieldTypeDescriptors.fieldTypeForJavaType(FieldTypeDescriptors.java:86)
 at 
org.apache.beam.sdk.schemas.transforms.SchemaAggregateFn$Inner.aggregateFields(SchemaAggregateFn.java:234)
 at 
org.apache.beam.sdk.schemas.transforms.Group$ByFields.aggregateField(Group.java:648)

 

It looks like a type erasure issue. Is there any work around. I am happy to 
manually tell beam what the inner type is

 

Example test case (in scala) here

 
{code:java}
// code placeholderdwd

  "data set " should "be aggregatable with group by" in {
    val schema = Schema.of(
      Schema.Field.nullable("key", Schema.FieldType.STRING),
      Schema.Field.nullable("value1", Schema.FieldType.INT32),
      Schema.Field.nullable("value2", Schema.FieldType.INT32)
    )

    val keys = Vector("foo", "bar", "baz")

    val inputRaw = 1.to(1000).map(_ => {
      Row.withSchema(schema)
        .addValues(
          keys(Random.nextInt(keys.length)),
          Random.nextInt(10000),
          (Random.nextInt(100) + 256),
        ).build()
    })


    testPipeline { p =>

      val quantileFn = 
ApproximateQuantilesCombineFn.create[java.lang.Integer](20)
      val outputType = quantileFn.getOutputType
      println(outputType)

      val input = p
        .apply(Create.of(inputRaw.asJava))
        .setRowSchema(schema)

      val res = input
        .apply(Group.byFieldNames("key")
          .aggregateField("value1", quantileFn, "quantiles"))
        .apply(MapDoFn.of(x => {
          println(x.toString)
          x.toString
        })).setCoder(StringUtf8Coder.of())

      PAssert.that(res).containsInAnyOrder()

    }

  }


}{code}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to