gong023 opened a new issue, #22073:
URL: https://github.com/apache/beam/issues/22073
### What happened?
Using Go SDK, I'd like to create a job to import avro data and then export
to bigquery.
My script like this now.
```go
package main
import (
"context"
"encoding/json"
"flag"
"fmt"
"log"
"reflect"
"time"
"cloud.google.com/go/bigquery"
"cloud.google.com/go/civil"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/avroio"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/bigqueryio"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)
var (
projectID = flag.String("project_id", "my_project_id", "GCP project ID")
input = flag.String("input", "gs://my_bucket/hoge.avro", "Input
avro file")
dataset = flag.String("dataset", "test_dataset", "Output Bigquery
dataset")
table = flag.String("table", "test_table", "Output Bigquery table")
)
type Sample struct {
ID bigquery.NullInt64 `bigquery:"id" json:"id"`
Date bigquery.NullDate `bigquery:"date" json:"date"`
}
func AppendDate(src string) Sample {
var s Sample
_ = json.Unmarshal([]byte(src), &s)
s.Date = bigquery.NullDate{Valid: true, Date: civil.DateOf(time.Now())}
return s
}
func main() {
flag.Parse()
beam.Init()
p := beam.NewPipeline()
s := p.Root()
orig := avroio.Read(s, *input, reflect.TypeOf(""))
format := beam.ParDo(s, AppendDate, orig)
t := fmt.Sprintf("%s:%s.%s", *projectID, *dataset, *table)
bigqueryio.Write(s, *projectID, t, format)
ctx := context.Background()
if err := beamx.Run(ctx, p); err != nil {
log.Fatal(err)
}
}
```
Unfortunately I got error below **only when the runner is `dataflow`**
```
$ go run cmd/terminal_info/main.go --runner dataflow
2022/06/28 16:16:22 No environment config specified. Using default config:
'apache/beam_go_sdk:2.39.0'
2022/06/28 16:16:22 failed to convert type main.Sample to a schema.
Full error:
generating model pipeline
failed to add input kind: {main.AppendDate 5: ParDo [In(Main): string <- {4:
string/string GLO}] -> [Out: main.Sample -> {5: main.Sample/R[main.Sample]
GLO}]}
caused by:
cannot convert field Date to schema
caused by:
unable to convert bigquery.NullDate to schema field
caused by:
cannot convert field Date to schema
caused by:
unable to convert civil.Date to schema field
caused by:
cannot convert field Month to schema
caused by:
unable to map time.Month to pipepb.AtomicType
exit status 1
```
According to the message, type in `bigquery.NullDate` seems not acceptable
for this sdk.
I understand this can be avoided by primitive type like `Date int64
`bigquery:"date" json:"date"``. However I want to set
`NullDate` literally.
Do you have any ideas to solve this error?
And then, the weird thing is I don't get the above error **when I use
`DirectRunner`**. If you have ideas about this too, I'd like to know.
```
go run cmd/terminal_info/main.go
2022/06/28 16:26:07 Executing pipeline with the direct runner.
2022/06/28 16:26:07 Pipeline:
2022/06/28 16:26:07 Nodes: {1: []uint8/bytes GLO}
{2: string/string GLO}
{3: string/string GLO}
{4: string/string GLO}
{5: main.Sample/R[main.Sample] GLO}
{6: KV<int,main.Sample>/KV<int[varintz],R[main.Sample]> GLO}
{7: CoGBK<int,main.Sample>/CoGBK<int[varintz],R[main.Sample]> GLO}
Edges: 1: Impulse [] -> [Out: []uint8 -> {1: []uint8/bytes GLO}]
2: ParDo [In(Main): []uint8 <- {1: []uint8/bytes GLO}] -> [Out: T -> {2:
string/string GLO}]
3: ParDo [In(Main): string <- {2: string/string GLO}] -> [Out: string -> {3:
string/string GLO}]
4: ParDo [In(Main): string <- {3: string/string GLO}] -> [Out: X -> {4:
string/string GLO}]
5: ParDo [In(Main): string <- {4: string/string GLO}] -> [Out: main.Sample
-> {5: main.Sample/R[main.Sample] GLO}]
6: ParDo [In(Main): T <- {5: main.Sample/R[main.Sample] GLO}] -> [Out:
KV<int,T> -> {6: KV<int,main.Sample>/KV<int[varintz],R[main.Sample]> GLO}]
7: CoGBK [In(Main): KV<int,main.Sample> <- {6:
KV<int,main.Sample>/KV<int[varintz],R[main.Sample]> GLO}] -> [Out:
CoGBK<int,main.Sample> -> {7:
CoGBK<int,main.Sample>/CoGBK<int[varintz],R[main.Sample]> GLO}]
8: ParDo [In(Main): CoGBK<int,X> <- {7:
CoGBK<int,main.Sample>/CoGBK<int[varintz],R[main.Sample]> GLO}] -> []
2022/06/28 16:26:07 Plan[plan]:
9: Impulse[0]
1: ParDo[bigqueryio.writeFn] Out:[]
2: CoGBK. Out:1
3: Inject[0]. Out:2
4: ParDo[beam.addFixedKeyFn] Out:[3]
5: ParDo[main.AppendDate] Out:[4]
6: ParDo[avroio.avroReadFn] Out:[5]
7: ParDo[avroio.expandFn] Out:[6]
8: ParDo[beam.createFn] Out:[7]
...
```
Thank you.
### Issue Priority
Priority: 1
### Issue Component
Component: sdk-go
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]