damccorm commented on code in PR #33988:
URL: https://github.com/apache/beam/pull/33988#discussion_r1965711639
##########
sdks/go/pkg/beam/io/bigqueryio/bigquery.go:
##########
@@ -190,13 +191,35 @@ func mustInferSchema(t reflect.Type) bigquery.Schema {
if t.Kind() != reflect.Struct {
panic(fmt.Sprintf("schema type must be struct: %v", t))
}
+
+ registerTypeIfNeeded(t)
+
schema, err := bigquery.InferSchema(reflect.Zero(t).Interface())
if err != nil {
panic(errors.Wrapf(err, "invalid schema type: %v", t))
}
return schema
}
+func registerTypeIfNeeded(t reflect.Type) {
+ t = reflectx.SkipPtr(t)
+ key, ok := runtime.TypeKey(t)
+ if !ok {
+ panic(fmt.Sprintf("type %v must be a named type (not anonymous)
for registration", t))
+ }
+
+ // Check if Beam has already been initialized.
+ if beam.Initialized() {
Review Comment:
I think beam will always be initialized before this is called since this is
part of pipeline construction
(https://beam.apache.org/documentation/programming-guide/#creating-a-pipeline).
This update also will break existing piupelines which do have types registered
correctly already.
I think we can remove this block, and just panic instead of calling
`runtime.RegisterType(t)` below (and probably we can rename this function to
something like `checkTypeRegistered`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]