gong023 opened a new issue, #22073:
URL: https://github.com/apache/beam/issues/22073

   ### What happened?
   
   Using Go SDK, I'd like to create a job to import avro data and then export 
to bigquery.
   
   My script like this now.
   
   ```go
   package main
   
   import (
        "context"
        "encoding/json"
        "flag"
        "fmt"
        "log"
        "reflect"
        "time"
   
        "cloud.google.com/go/bigquery"
        "cloud.google.com/go/civil"
        "github.com/apache/beam/sdks/v2/go/pkg/beam"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/io/avroio"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/io/bigqueryio"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
   )
   
   var (
        projectID = flag.String("project_id", "my_project_id", "GCP project ID")
        input     = flag.String("input", "gs://my_bucket/hoge.avro", "Input 
avro file")
        dataset   = flag.String("dataset", "test_dataset", "Output Bigquery 
dataset")
        table     = flag.String("table", "test_table", "Output Bigquery table")
   )
   
   type Sample struct {
        ID   bigquery.NullInt64 `bigquery:"id" json:"id"`
        Date bigquery.NullDate  `bigquery:"date" json:"date"`
   }
   
   func AppendDate(src string) Sample {
        var s Sample
        _ = json.Unmarshal([]byte(src), &s)
        s.Date = bigquery.NullDate{Valid: true, Date: civil.DateOf(time.Now())}
        return s
   }
   
   func main() {
        flag.Parse()
   
        beam.Init()
        p := beam.NewPipeline()
        s := p.Root()
   
        orig := avroio.Read(s, *input, reflect.TypeOf(""))
   
        format := beam.ParDo(s, AppendDate, orig)
   
        t := fmt.Sprintf("%s:%s.%s", *projectID, *dataset, *table)
        bigqueryio.Write(s, *projectID, t, format)
   
        ctx := context.Background()
        if err := beamx.Run(ctx, p); err != nil {
                log.Fatal(err)
        }
   }
   ```
   
   Unfortunately I got error below **only when the runner is `dataflow`**
   
   ```
   $  go run cmd/terminal_info/main.go --runner dataflow 
   
   2022/06/28 16:16:22 No environment config specified. Using default config: 
'apache/beam_go_sdk:2.39.0'
   2022/06/28 16:16:22 failed to convert type main.Sample to a schema.
   Full error:
           generating model pipeline
   failed to add input kind: {main.AppendDate 5: ParDo [In(Main): string <- {4: 
string/string GLO}] -> [Out: main.Sample -> {5: main.Sample/R[main.Sample] 
GLO}]}
           caused by:
   cannot convert field Date to schema
           caused by:
   unable to convert bigquery.NullDate to schema field
           caused by:
   cannot convert field Date to schema
           caused by:
   unable to convert civil.Date to schema field
           caused by:
   cannot convert field Month to schema
           caused by:
   unable to map time.Month to pipepb.AtomicType
   exit status 1
   ```
   
   According to the message, type in `bigquery.NullDate` seems not acceptable 
for this sdk. 
   
   I understand this can be avoided by primitive type like `Date int64  
`bigquery:"date" json:"date"``. However I want to set 
   `NullDate` literally.
   
   Do you have any ideas to solve this error?
   
   And then, the weird thing is I don't get the above error **when I use 
`DirectRunner`**. If you have ideas about this too, I'd like to know.
   
   ```
    go run cmd/terminal_info/main.go
   2022/06/28 16:26:07 Executing pipeline with the direct runner.
   2022/06/28 16:26:07 Pipeline:
   2022/06/28 16:26:07 Nodes: {1: []uint8/bytes GLO}
   {2: string/string GLO}
   {3: string/string GLO}
   {4: string/string GLO}
   {5: main.Sample/R[main.Sample] GLO}
   {6: KV<int,main.Sample>/KV<int[varintz],R[main.Sample]> GLO}
   {7: CoGBK<int,main.Sample>/CoGBK<int[varintz],R[main.Sample]> GLO}
   Edges: 1: Impulse [] -> [Out: []uint8 -> {1: []uint8/bytes GLO}]
   2: ParDo [In(Main): []uint8 <- {1: []uint8/bytes GLO}] -> [Out: T -> {2: 
string/string GLO}]
   3: ParDo [In(Main): string <- {2: string/string GLO}] -> [Out: string -> {3: 
string/string GLO}]
   4: ParDo [In(Main): string <- {3: string/string GLO}] -> [Out: X -> {4: 
string/string GLO}]
   5: ParDo [In(Main): string <- {4: string/string GLO}] -> [Out: main.Sample 
-> {5: main.Sample/R[main.Sample] GLO}]
   6: ParDo [In(Main): T <- {5: main.Sample/R[main.Sample] GLO}] -> [Out: 
KV<int,T> -> {6: KV<int,main.Sample>/KV<int[varintz],R[main.Sample]> GLO}]
   7: CoGBK [In(Main): KV<int,main.Sample> <- {6: 
KV<int,main.Sample>/KV<int[varintz],R[main.Sample]> GLO}] -> [Out: 
CoGBK<int,main.Sample> -> {7: 
CoGBK<int,main.Sample>/CoGBK<int[varintz],R[main.Sample]> GLO}]
   8: ParDo [In(Main): CoGBK<int,X> <- {7: 
CoGBK<int,main.Sample>/CoGBK<int[varintz],R[main.Sample]> GLO}] -> []
   2022/06/28 16:26:07 Plan[plan]:
   9: Impulse[0]
   1: ParDo[bigqueryio.writeFn] Out:[]
   2: CoGBK. Out:1
   3: Inject[0]. Out:2
   4: ParDo[beam.addFixedKeyFn] Out:[3]
   5: ParDo[main.AppendDate] Out:[4]
   6: ParDo[avroio.avroReadFn] Out:[5]
   7: ParDo[avroio.expandFn] Out:[6]
   8: ParDo[beam.createFn] Out:[7]
   ...
   ```
   
   Thank you.
   
   ### Issue Priority
   
   Priority: 1
   
   ### Issue Component
   
   Component: sdk-go


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to