luke de feo created BEAM-12255:
----------------------------------
Summary: Group.aggregateField does not work with combine fns that
return list[T]
Key: BEAM-12255
URL: https://issues.apache.org/jira/browse/BEAM-12255
Project: Beam
Issue Type: Bug
Components: beam-model
Affects Versions: 2.28.0
Reporter: luke de feo
I am trying to use the group api to apply multiple aggregations to a Row. When
i add a combine fn where the output is a list[T] the job blows up at run time.
Couldn't find field type for Tjava.lang.RuntimeException: Couldn't find field
type for T at
org.apache.beam.sdk.schemas.FieldTypeDescriptors.fieldTypeForJavaType(FieldTypeDescriptors.java:98)
at
org.apache.beam.sdk.schemas.FieldTypeDescriptors.getArrayFieldType(FieldTypeDescriptors.java:118)
at
org.apache.beam.sdk.schemas.FieldTypeDescriptors.fieldTypeForJavaType(FieldTypeDescriptors.java:86)
at
org.apache.beam.sdk.schemas.transforms.SchemaAggregateFn$Inner.aggregateFields(SchemaAggregateFn.java:234)
at
org.apache.beam.sdk.schemas.transforms.Group$ByFields.aggregateField(Group.java:648)
It looks like a type erasure issue. Is there any work around. I am happy to
manually tell beam what the inner type is
Example test case (in scala) here
{code:java}
// code placeholderdwd
"data set " should "be aggregatable with group by" in {
val schema = Schema.of(
Schema.Field.nullable("key", Schema.FieldType.STRING),
Schema.Field.nullable("value1", Schema.FieldType.INT32),
Schema.Field.nullable("value2", Schema.FieldType.INT32)
)
val keys = Vector("foo", "bar", "baz")
val inputRaw = 1.to(1000).map(_ => {
Row.withSchema(schema)
.addValues(
keys(Random.nextInt(keys.length)),
Random.nextInt(10000),
(Random.nextInt(100) + 256),
).build()
})
testPipeline { p =>
val quantileFn =
ApproximateQuantilesCombineFn.create[java.lang.Integer](20)
val outputType = quantileFn.getOutputType
println(outputType)
val input = p
.apply(Create.of(inputRaw.asJava))
.setRowSchema(schema)
val res = input
.apply(Group.byFieldNames("key")
.aggregateField("value1", quantileFn, "quantiles"))
.apply(MapDoFn.of(x => {
println(x.toString)
x.toString
})).setCoder(StringUtf8Coder.of())
PAssert.that(res).containsInAnyOrder()
}
}
}{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)