[
https://issues.apache.org/jira/browse/BEAM-9814?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17108415#comment-17108415
]
Brendan Stennett edited comment on BEAM-9814 at 5/15/20, 4:02 PM:
------------------------------------------------------------------
[~reuvenlax] this appears even in simple examples without manipulating the
RowCoder
{code:java}
Pipeline p = Pipeline.create(options);
Schema schema = Schema.builder()
.addField("value", Schema.FieldType.STRING)
.build();
p.apply(Create.of("row1", "row2", "row3"))
.apply(ParDo.of(new DoFn<String, Row>() {
@ProcessElement
public void processElement(@Element String input,
OutputReceiver<Row> out) {
Row row = Row.withSchema(schema)
.addValue(input)
.build();
out.output(row);
}
}))
.apply(ParDo.of(new DoFn<Row, String>() {
@ProcessElement
public void processElement(@Element Row row, OutputReceiver<String>
out) {
out.output(row.getString("value"));
}
}))
.apply("Write", TextIO.write().to(options.getOutput()));
{code}
Edit: This exact example works in 2.20.0 but not in 2.21.0-SNAPSHOT or
2.22.0-SNAPSHOT
was (Author: brendan6):
[~reuvenlax] this appears even in simple examples without manipulating the
RowCoder
{code:language=java}
Pipeline p = Pipeline.create(options);
Schema schema = Schema.builder()
.addField("value", Schema.FieldType.STRING)
.build();
p.apply(Create.of("row1", "row2", "row3"))
.apply(ParDo.of(new DoFn<String, Row>() {
@ProcessElement
public void processElement(@Element String input,
OutputReceiver<Row> out) {
Row row = Row.withSchema(schema)
.addValue(input)
.build();
out.output(row);
}
}))
.apply(ParDo.of(new DoFn<Row, String>() {
@ProcessElement
public void processElement(@Element Row row, OutputReceiver<String>
out) {
out.output(row.getString("value"));
}
}))
.apply("Write", TextIO.write().to(options.getOutput()));
{code}
> Error occurred when transforming from row to a new row without setCoder
> -----------------------------------------------------------------------
>
> Key: BEAM-9814
> URL: https://issues.apache.org/jira/browse/BEAM-9814
> Project: Beam
> Issue Type: Bug
> Components: sdk-java-core
> Affects Versions: 2.19.0
> Reporter: Ruixue Liao
> Assignee: Reuven Lax
> Priority: Major
>
> The output row from transform function uses the input row schema to verify
> which causes error. Ex:
> {code}
> .apply(MapElements.via(
> new SimpleFunction<Row, Row>() \{
> @Override
> public Row apply(Row line) {
> return Row.withSchema(newSchema).addValues("a", 1,
> "b").build();
> }
> }));
> {code}
> Got error when the output row schema is not the same as the input row.
> Need to add {{.setCoder(RowCoder.of(newSchema))}} after the transform
> function to make it work.
> Related link:
> [https://stackoverflow.com/questions/61236972/beam-sql-udf-to-split-one-column-into-multiple-columns]
--
This message was sent by Atlassian Jira
(v8.3.4#803005)