This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 6f5543f [BEAM-9615] Make UUIDs deterministic on Go types. (#14773)
6f5543f is described below
commit 6f5543f1e8d60aa93f4f4c08e0542f4a60f533f4
Author: Robert Burke <[email protected]>
AuthorDate: Mon May 10 15:47:03 2021 -0700
[BEAM-9615] Make UUIDs deterministic on Go types. (#14773)
Co-authored-by: lostluck <[email protected]>
---
.../core/runtime/graphx/schema/logicaltypes.go | 1 -
.../pkg/beam/core/runtime/graphx/schema/schema.go | 25 ++++++++++++++++------
2 files changed, 19 insertions(+), 7 deletions(-)
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/schema/logicaltypes.go
b/sdks/go/pkg/beam/core/runtime/graphx/schema/logicaltypes.go
index a011a78..0bd33bd 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/schema/logicaltypes.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/schema/logicaltypes.go
@@ -51,7 +51,6 @@ type LogicalTypeProvider = func(reflect.Type) (reflect.Type,
error)
// Registry retains mappings from go types to Schemas and LogicalTypes.
type Registry struct {
- lastShortID int64
typeToSchema map[reflect.Type]*pipepb.Schema
idToType map[string]reflect.Type
syntheticToUser map[reflect.Type]reflect.Type
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
b/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
index abb6f10..087d8c1 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
@@ -26,7 +26,9 @@
package schema
import (
+ "bytes"
"fmt"
+ "hash/fnv"
"reflect"
"strings"
@@ -72,8 +74,19 @@ func RegisterType(ut reflect.Type) {
defaultRegistry.RegisterType(ut)
}
-func getUUID() string {
- return uuid.New().String()
+// getUUID generates a UUID using the string form of the type name.
+func getUUID(ut reflect.Type) string {
+ // String produces non-empty output for pointer and slice types.
+ typename := ut.String()
+ hasher := fnv.New128a()
+ if n, err := hasher.Write([]byte(typename)); err != nil || n !=
len(typename) {
+ panic(fmt.Sprintf("unable to generate schema uuid for %s, wrote
out %d bytes, want %d: err %v", typename, n, len(typename), err))
+ }
+ id, err := uuid.NewRandomFromReader(bytes.NewBuffer(hasher.Sum(nil)))
+ if err != nil {
+ panic(fmt.Sprintf("unable to genereate schema uuid for type %s:
%v", typename, err))
+ }
+ return id.String()
}
// Registered returns whether the given type has been registered with
@@ -350,7 +363,7 @@ func (r *Registry) fromType(ot reflect.Type)
(*pipepb.Schema, error) {
if lID != "" {
schm.Options = append(schm.Options, logicalOption(lID))
}
- schm.Id = getUUID()
+ schm.Id = getUUID(ot)
r.typeToSchema[ot] = schm
r.idToType[schm.GetId()] = ot
return schm, nil
@@ -365,7 +378,7 @@ func (r *Registry) fromType(ot reflect.Type)
(*pipepb.Schema, error) {
// Cache the pointer type here with it's own id.
pt := reflect.PtrTo(t)
schm = proto.Clone(schm).(*pipepb.Schema)
- schm.Id = getUUID()
+ schm.Id = getUUID(pt)
schm.Options = append(schm.Options, &pipepb.Option{
Name: optGoNillable,
})
@@ -454,7 +467,7 @@ func (r *Registry) structToSchema(t reflect.Type)
(*pipepb.Schema, error) {
schm := ftype.GetRowType().GetSchema()
schm = proto.Clone(schm).(*pipepb.Schema)
schm.Options = append(schm.Options, logicalOption(lID))
- schm.Id = getUUID()
+ schm.Id = getUUID(t)
r.typeToSchema[t] = schm
r.idToType[schm.GetId()] = t
return schm, nil
@@ -483,7 +496,7 @@ func (r *Registry) structToSchema(t reflect.Type)
(*pipepb.Schema, error) {
schm := &pipepb.Schema{
Fields: fields,
- Id: getUUID(),
+ Id: getUUID(t),
}
r.idToType[schm.GetId()] = t
r.typeToSchema[t] = schm