jeremyje opened a new issue, #25721:
URL: https://github.com/apache/beam/issues/25721

   ### What happened?
   
   Using Go SDK's `parquetio.Write` on Google Cloud Dataflow does not appear to 
work. Run the code example below to reproduce the error.
   This code will work with the direct runner.
   
   ```go
   // Test code for writing a parquet file using the example structure in 
Beam's parquetio package.
   // Based on the example presented at 
https://pkg.go.dev/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/io/parquetio.
   // go run cmd/parquet/parquet.go -dataflow_service_options enable_prime 
-async -runner dataflow -project $PROJECT -region us-west1 -staging_location 
gs://$BUCKET/staging/
   // go run cmd/parquet/parquet.go -async -runner dataflow -project $PROJECT 
-region us-west1 -staging_location gs://$BUCKET/staging/ 
   package main
   
   import (
        "context"
        "flag"
        "fmt"
        "reflect"
   
        "github.com/apache/beam/sdks/v2/go/pkg/beam"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/io/parquetio"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
   
        _ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs"
   )
   
   const (
        BUCKET = "REPLACE_WITH_YOUR_GCS_BUCKET"
   )
   
   type Student struct {
        Name    string  `parquet:"name=name, type=BYTE_ARRAY, 
convertedtype=UTF8, encoding=PLAIN_DICTIONARY"`
        Age     int32   `parquet:"name=age, type=INT32, encoding=PLAIN"`
        Id      int64   `parquet:"name=id, type=INT64"`
        Weight  float32 `parquet:"name=weight, type=FLOAT"`
        Sex     bool    `parquet:"name=sex, type=BOOLEAN"`
        Day     int32   `parquet:"name=day, type=INT32, convertedtype=DATE"`
        Ignored int32   //without parquet tag and won't write
   }
   
   func main() {
        flag.Parse()
        ctx := context.Background()
        beam.Init()
   
        p, s := beam.NewPipelineWithRoot()
   
        oneStudent := beam.CreateList(s, []Student{
                {
                        Name:    "Apache Beam",
                        Age:     5,
                        Id:      9000,
                        Weight:  350,
                        Sex:     false,
                        Day:     2,
                        Ignored: 101,
                },
        })
        parquetio.Write(s, fmt.Sprintf("gs://%s/parquet/student.parquet", 
BUCKET), reflect.TypeOf(Student{}), oneStudent)
   
        if err := beamx.Run(ctx, p); err != nil {
                log.Exitf(ctx, "Failed to execute job: %v", err)
        }
   }
   ```
   
   ```bash
   2023/03/05 22:51:35 No environment config specified. Using default config: 
'apache/beam_go_sdk:2.45.0'
   2023/03/05 22:51:35 Failed to execute job:      generating model pipeline
   failed to add scope tree: &{{parquetio.Write root/parquetio.Write} 
[{github.com/apache/beam/sdks/v2/go/pkg/beam.addFixedKeyFn 3: ParDo [In(Main): 
T <- {2: main.Student/R[main.Student] GLO}] -> [Out: KV<int,T> -> {3: 
KV<int,main.Student>/KV<int[varintz],R[main.Student]> GLO}]} {CoGBK 4: CoGBK 
[In(Main): KV<int,main.Student> <- {3: 
KV<int,main.Student>/KV<int[varintz],R[main.Student]> GLO}] -> [Out: 
CoGBK<int,main.Student> -> {4: 
CoGBK<int,main.Student>/CoGBK<int[varintz],R[main.Student]> GLO}]} 
{github.com/apache/beam/sdks/v2/go/pkg/beam/io/parquetio.parquetWriteFn 5: 
ParDo [In(Main): CoGBK<int,interface {}> <- {4: 
CoGBK<int,main.Student>/CoGBK<int[varintz],R[main.Student]> GLO}] -> []}] []}
           caused by:
   failed to add input kind: 
{github.com/apache/beam/sdks/v2/go/pkg/beam/io/parquetio.parquetWriteFn 5: 
ParDo [In(Main): CoGBK<int,interface {}> <- {4: 
CoGBK<int,main.Student>/CoGBK<int[varintz],R[main.Student]> GLO}] -> []}
           caused by:
   failed to serialize 5: ParDo [In(Main): CoGBK<int,interface {}> <- {4: 
CoGBK<int,main.Student>/CoGBK<int[varintz],R[main.Student]> GLO}] -> []
           caused by:
           encoding userfn 5: ParDo [In(Main): CoGBK<int,interface {}> <- {4: 
CoGBK<int,main.Student>/CoGBK<int[varintz],R[main.Student]> GLO}] -> []
   bad input type
           caused by:
           encoding full type interface {}
   bad type
           caused by:
   unencodable type 'interface'
   exit status 1
   ```
   
   `go version go1.20.1 linux/amd64`
   
   ### Issue Priority
   
   Priority: 2 (default / most bugs should be filed as P2)
   
   ### Issue Components
   
   - [ ] Component: Python SDK
   - [ ] Component: Java SDK
   - [X] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [X] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to