This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new a571952 [BEAM-12438] Add Regression test for issue around LP coding
Row coders. (#14924)
a571952 is described below
commit a571952e3ce470b3871ebd333eacb3d6368d2737
Author: Robert Burke <[email protected]>
AuthorDate: Mon Jun 7 09:45:26 2021 -0700
[BEAM-12438] Add Regression test for issue around LP coding Row coders.
(#14924)
* [BEAM-12438] Run regression during integration
* [BEAM-12438] Add LP error repro
* [BEAM-12438] Ignore extra LP on injects.
* [BEAM-12438] Populate schema option types.
Co-authored-by: zelliott
---
model/pipeline/src/main/proto/schema.proto | 6 +--
sdks/go/pkg/beam/core/runtime/exec/translate.go | 9 +++-
.../pkg/beam/core/runtime/graphx/schema/schema.go | 56 ++++++++++++++-----
.../beam/core/runtime/graphx/schema/schema_test.go | 16 ++----
sdks/go/test/regression/lperror.go | 63 ++++++++++++++++++++++
.../regression/{pardo_test.go => lperror_test.go} | 46 +++++-----------
sdks/go/test/regression/pardo_test.go | 35 ++++++------
.../{pardo_test.go => regression_test.go} | 38 ++-----------
sdks/go/test/run_validatesrunner_tests.sh | 4 +-
9 files changed, 158 insertions(+), 115 deletions(-)
diff --git a/model/pipeline/src/main/proto/schema.proto
b/model/pipeline/src/main/proto/schema.proto
index 837689f..bcab2e7 100644
--- a/model/pipeline/src/main/proto/schema.proto
+++ b/model/pipeline/src/main/proto/schema.proto
@@ -113,9 +113,9 @@ message LogicalType {
message Option {
// REQUIRED. Identifier for the option.
string name = 1;
- // OPTIONAL. Type specifer for the structure of value.
- // If not present, assumes no additional configuration is needed
- // for this option and value is ignored.
+ // REQUIRED. Type specifer for the structure of value.
+ // Conventionally, options that don't require additional configuration should
+ // use a boolean type, with the value set to true.
FieldType type = 2;
FieldValue value = 3;
}
diff --git a/sdks/go/pkg/beam/core/runtime/exec/translate.go
b/sdks/go/pkg/beam/core/runtime/exec/translate.go
index fbbdab3..105eb82 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/translate.go
@@ -472,7 +472,14 @@ func (b *builder) makeLink(from string, id linkID) (Node,
error) {
if !coder.IsKV(c) {
return nil, errors.Errorf("unexpected inject
coder: %v", c)
}
- u = &Inject{UID: b.idgen.New(), N: (int)(tp.Inject.N),
ValueEncoder: MakeElementEncoder(c.Components[1]), Out: out[0]}
+ valCoder := c.Components[1]
+ // JIRA BEAM-12438 - an extra LP coder can get added
here, but isn't added
+ // on decode. Strip them until we get a better fix.
+ if valCoder.Kind == coder.LP {
+ // strip unexpected length prefix coder.
+ valCoder = valCoder.Components[0]
+ }
+ u = &Inject{UID: b.idgen.New(), N: (int)(tp.Inject.N),
ValueEncoder: MakeElementEncoder(valCoder), Out: out[0]}
case graphx.URNExpand:
var pid string
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
b/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
index 087d8c1..2e3ea3f 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
@@ -356,9 +356,7 @@ func (r *Registry) fromType(ot reflect.Type)
(*pipepb.Schema, error) {
schm := ftype.GetRowType().GetSchema()
schm = proto.Clone(schm).(*pipepb.Schema)
if ot.Kind() == reflect.Ptr {
- schm.Options = append(schm.Options, &pipepb.Option{
- Name: optGoNillable,
- })
+ schm.Options = append(schm.Options, optGoNillable())
}
if lID != "" {
schm.Options = append(schm.Options, logicalOption(lID))
@@ -379,9 +377,7 @@ func (r *Registry) fromType(ot reflect.Type)
(*pipepb.Schema, error) {
pt := reflect.PtrTo(t)
schm = proto.Clone(schm).(*pipepb.Schema)
schm.Id = getUUID(pt)
- schm.Options = append(schm.Options, &pipepb.Option{
- Name: optGoNillable,
- })
+ schm.Options = append(schm.Options, optGoNillable())
r.idToType[schm.GetId()] = pt
r.typeToSchema[pt] = schm
@@ -392,14 +388,46 @@ func (r *Registry) fromType(ot reflect.Type)
(*pipepb.Schema, error) {
// Schema Option urns.
const (
// optGoNillable indicates that this top level schema should be
returned as a pointer type.
- optGoNillable = "beam:schema:go:nillable:v1"
+ optGoNillableUrn = "beam:schema:go:nillable:v1"
// optGoEmbedded indicates that this field is an embedded type.
- optGoEmbedded = "beam:schema:go:embedded_field:v1"
+ optGoEmbeddedUrn = "beam:schema:go:embedded_field:v1"
// optGoLogical indicates that this top level schema has a logical type
equivalent that need to be looked up.
// It has a value type of String representing the URN for the logical
type to look up.
- optGoLogical = "beam:schema:go:logical:v1"
+ optGoLogicalUrn = "beam:schema:go:logical:v1"
)
+func optGoNillable() *pipepb.Option {
+ return newToggleOption(optGoNillableUrn)
+}
+
+func optGoEmbedded() *pipepb.Option {
+ return newToggleOption(optGoEmbeddedUrn)
+}
+
+// newToggleOption constructs an Option whose presence is all
+// that matters, rather than other configuration. The option
+// is not set if the toggle isn't true, so the value is always
+// true.
+func newToggleOption(urn string) *pipepb.Option {
+ return &pipepb.Option{
+ Name: urn,
+ Type: &pipepb.FieldType{
+ TypeInfo: &pipepb.FieldType_AtomicType{
+ AtomicType: pipepb.AtomicType_BOOLEAN,
+ },
+ },
+ Value: &pipepb.FieldValue{
+ FieldValue: &pipepb.FieldValue_AtomicValue{
+ AtomicValue: &pipepb.AtomicTypeValue{
+ Value: &pipepb.AtomicTypeValue_Boolean{
+ Boolean: true,
+ },
+ },
+ },
+ },
+ }
+}
+
func checkOptions(opts []*pipepb.Option, urn string) *pipepb.Option {
for _, opt := range opts {
if opt.GetName() == urn {
@@ -412,7 +440,7 @@ func checkOptions(opts []*pipepb.Option, urn string)
*pipepb.Option {
// nillableFromOptions converts the passed in type to it's pointer version
// if the option is present. This permits go types to be pointers.
func nillableFromOptions(opts []*pipepb.Option, t reflect.Type) reflect.Type {
- if checkOptions(opts, optGoNillable) != nil {
+ if checkOptions(opts, optGoNillableUrn) != nil {
return reflect.PtrTo(t)
}
return nil
@@ -426,7 +454,7 @@ var optGoLogicalType = &pipepb.FieldType{
func logicalOption(lID string) *pipepb.Option {
return &pipepb.Option{
- Name: optGoLogical,
+ Name: optGoLogicalUrn,
Type: optGoLogicalType,
Value: &pipepb.FieldValue{
FieldValue: &pipepb.FieldValue_AtomicValue{
@@ -443,7 +471,7 @@ func logicalOption(lID string) *pipepb.Option {
// fromLogicalOption returns the logical type id of this top
// level type if this schema has a logical equivalent.
func fromLogicalOption(opts []*pipepb.Option) (string, bool) {
- o := checkOptions(opts, optGoLogical)
+ o := checkOptions(opts, optGoLogicalUrn)
if o == nil {
return "", false
}
@@ -489,7 +517,7 @@ func (r *Registry) structToSchema(t reflect.Type)
(*pipepb.Schema, error) {
}
if isAnon {
f = proto.Clone(f).(*pipepb.Field)
- f.Options = append(f.Options, &pipepb.Option{Name:
optGoEmbedded})
+ f.Options = append(f.Options, optGoEmbedded())
}
fields = append(fields, f)
}
@@ -663,7 +691,7 @@ func (r *Registry) toType(s *pipepb.Schema) (reflect.Type,
error) {
if err != nil {
return nil, errors.Wrapf(err, "cannot convert schema
field %v to field", sf.GetName())
}
- if checkOptions(sf.Options, optGoEmbedded) != nil {
+ if checkOptions(sf.Options, optGoEmbeddedUrn) != nil {
rf.Anonymous = true
}
fields = append(fields, rf)
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/schema/schema_test.go
b/sdks/go/pkg/beam/core/runtime/graphx/schema/schema_test.go
index b298d43..9fe2132 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/schema/schema_test.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/schema/schema_test.go
@@ -404,9 +404,7 @@ func TestSchemaConversion(t *testing.T) {
},
},
},
- Options: []*pipepb.Option{{
- Name: optGoNillable,
- }},
+ Options: []*pipepb.Option{optGoNillable()},
},
rt: reflect.TypeOf(&struct {
SuperNES int16
@@ -530,9 +528,7 @@ func TestSchemaConversion(t *testing.T) {
},
},
},
- Options: []*pipepb.Option{{
- Name: optGoNillable,
- }, logicalOption("*schema.exportedFunc")},
+ Options: []*pipepb.Option{optGoNillable(),
logicalOption("*schema.exportedFunc")},
},
rt: exportedFuncType,
}, {
@@ -568,7 +564,7 @@ func TestSchemaConversion(t *testing.T) {
Fields: []*pipepb.Field{
{
Name: "Exported",
- Options:
[]*pipepb.Option{&pipepb.Option{Name: optGoEmbedded}},
+ Options:
[]*pipepb.Option{optGoEmbedded()},
Type: &pipepb.FieldType{
TypeInfo:
&pipepb.FieldType_RowType{
RowType:
&pipepb.RowType{
@@ -610,7 +606,7 @@ func TestSchemaConversion(t *testing.T) {
Fields: []*pipepb.Field{
{
Name: "Exported",
- Options:
[]*pipepb.Option{&pipepb.Option{Name: optGoEmbedded}},
+ Options:
[]*pipepb.Option{optGoEmbedded()},
Type: &pipepb.FieldType{
Nullable: true,
TypeInfo:
&pipepb.FieldType_RowType{
@@ -660,9 +656,7 @@ func TestSchemaConversion(t *testing.T) {
},
},
},
- Options: []*pipepb.Option{{
- Name: optGoNillable,
- }},
+ Options: []*pipepb.Option{optGoNillable()},
},
rt: reflect.TypeOf(&struct {
myInt
diff --git a/sdks/go/test/regression/lperror.go
b/sdks/go/test/regression/lperror.go
new file mode 100644
index 0000000..4e27f97
--- /dev/null
+++ b/sdks/go/test/regression/lperror.go
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package regression
+
+import (
+ "github.com/apache/beam/sdks/go/pkg/beam"
+)
+
+// REPRO found by https://github.com/zelliott
+
+type fruit struct {
+ Name string
+}
+
+func toFoo(id int, _ func(**fruit) bool) (int, string) {
+ return id, "Foo"
+}
+
+func toID(id int, fruitIter func(**fruit) bool, _ func(*string) bool) int {
+ var fruit *fruit
+ for fruitIter(&fruit) {
+ }
+ return id
+}
+
+// LPErrorPipeline constructs a pipeline that has a GBK followed by a CoGBK
using the same
+// input, with schema encoded structs as elements. This ends up having the
stage after the
+// CoGBK fail since the decoder post-cogbk is missing a Length Prefix coder
that was
+// applied to the GBK input, but not the CoGBK output.
+// Root is likely in that there's no Beam standard CoGBK format for inject and
expand.
+// JIRA: BEAM-12438
+func LPErrorPipeline(s beam.Scope) beam.PCollection {
+ // ["Apple", "Banana", "Cherry"]
+ fruits := beam.CreateList(s, []*fruit{{"Apple"}, {"Banana"},
{"Cherry"}})
+
+ // [0 "Apple", 0 "Banana", 0 "Cherry"]
+ fruitsKV := beam.AddFixedKey(s, fruits)
+
+ // [0 ["Apple", "Banana", "Cherry"]]
+ fruitsGBK := beam.GroupByKey(s, fruitsKV)
+
+ // [0 "Foo"]
+ fooKV := beam.ParDo(s, toFoo, fruitsGBK)
+
+ // [0 ["Foo"] ["Apple", "Banana", "Cherry"]]
+ fruitsFooCoGBK := beam.CoGroupByKey(s, fruitsKV, fooKV)
+
+ // [0]
+ return beam.ParDo(s, toID, fruitsFooCoGBK)
+}
diff --git a/sdks/go/test/regression/pardo_test.go
b/sdks/go/test/regression/lperror_test.go
similarity index 56%
copy from sdks/go/test/regression/pardo_test.go
copy to sdks/go/test/regression/lperror_test.go
index 322dd69..773570d 100644
--- a/sdks/go/test/regression/pardo_test.go
+++ b/sdks/go/test/regression/lperror_test.go
@@ -18,41 +18,23 @@ package regression
import (
"testing"
+ "github.com/apache/beam/sdks/go/pkg/beam"
+ "github.com/apache/beam/sdks/go/pkg/beam/testing/passert"
"github.com/apache/beam/sdks/go/pkg/beam/testing/ptest"
-)
-
-func TestDirectParDo(t *testing.T) {
- if err := ptest.Run(DirectParDo()); err != nil {
- t.Error(err)
- }
-}
-
-func TestEmitParDo(t *testing.T) {
- if err := ptest.Run(EmitParDo()); err != nil {
- t.Error(err)
- }
-}
+ "github.com/apache/beam/sdks/go/test/integration"
-func TestMultiEmitParDo(t *testing.T) {
- if err := ptest.Run(MultiEmitParDo()); err != nil {
- t.Error(err)
- }
-}
+ _ "github.com/apache/beam/sdks/go/pkg/beam/runners/dataflow"
+ _ "github.com/apache/beam/sdks/go/pkg/beam/runners/flink"
+ _ "github.com/apache/beam/sdks/go/pkg/beam/runners/spark"
+)
-func TestMixedOutputParDo(t *testing.T) {
- if err := ptest.Run(MixedOutputParDo()); err != nil {
- t.Error(err)
- }
-}
+func TestLPErrorPipeline(t *testing.T) {
+ integration.CheckFilters(t)
-func TestDirectParDoAfterGBK(t *testing.T) {
- if err := ptest.Run(DirectParDoAfterGBK()); err != nil {
- t.Error(err)
- }
-}
+ pipeline, s := beam.NewPipelineWithRoot()
+ want := beam.CreateList(s, []int{0})
+ got := LPErrorPipeline(s)
+ passert.Equals(s, got, want)
-func TestEmitParDoAfterGBK(t *testing.T) {
- if err := ptest.Run(EmitParDoAfterGBK()); err != nil {
- t.Error(err)
- }
+ ptest.RunAndValidate(t, pipeline)
}
diff --git a/sdks/go/test/regression/pardo_test.go
b/sdks/go/test/regression/pardo_test.go
index 322dd69..cbcb81c 100644
--- a/sdks/go/test/regression/pardo_test.go
+++ b/sdks/go/test/regression/pardo_test.go
@@ -19,40 +19,39 @@ import (
"testing"
"github.com/apache/beam/sdks/go/pkg/beam/testing/ptest"
+ "github.com/apache/beam/sdks/go/test/integration"
+
+ _ "github.com/apache/beam/sdks/go/pkg/beam/runners/dataflow"
+ _ "github.com/apache/beam/sdks/go/pkg/beam/runners/flink"
+ _ "github.com/apache/beam/sdks/go/pkg/beam/runners/spark"
)
func TestDirectParDo(t *testing.T) {
- if err := ptest.Run(DirectParDo()); err != nil {
- t.Error(err)
- }
+ integration.CheckFilters(t)
+ ptest.RunAndValidate(t, DirectParDo())
}
func TestEmitParDo(t *testing.T) {
- if err := ptest.Run(EmitParDo()); err != nil {
- t.Error(err)
- }
+ integration.CheckFilters(t)
+ ptest.RunAndValidate(t, EmitParDo())
}
func TestMultiEmitParDo(t *testing.T) {
- if err := ptest.Run(MultiEmitParDo()); err != nil {
- t.Error(err)
- }
+ integration.CheckFilters(t)
+ ptest.RunAndValidate(t, MultiEmitParDo())
}
func TestMixedOutputParDo(t *testing.T) {
- if err := ptest.Run(MixedOutputParDo()); err != nil {
- t.Error(err)
- }
+ integration.CheckFilters(t)
+ ptest.RunAndValidate(t, MixedOutputParDo())
}
func TestDirectParDoAfterGBK(t *testing.T) {
- if err := ptest.Run(DirectParDoAfterGBK()); err != nil {
- t.Error(err)
- }
+ integration.CheckFilters(t)
+ ptest.RunAndValidate(t, DirectParDoAfterGBK())
}
func TestEmitParDoAfterGBK(t *testing.T) {
- if err := ptest.Run(EmitParDoAfterGBK()); err != nil {
- t.Error(err)
- }
+ integration.CheckFilters(t)
+ ptest.RunAndValidate(t, EmitParDoAfterGBK())
}
diff --git a/sdks/go/test/regression/pardo_test.go
b/sdks/go/test/regression/regression_test.go
similarity index 56%
copy from sdks/go/test/regression/pardo_test.go
copy to sdks/go/test/regression/regression_test.go
index 322dd69..132e9a8 100644
--- a/sdks/go/test/regression/pardo_test.go
+++ b/sdks/go/test/regression/regression_test.go
@@ -21,38 +21,8 @@ import (
"github.com/apache/beam/sdks/go/pkg/beam/testing/ptest"
)
-func TestDirectParDo(t *testing.T) {
- if err := ptest.Run(DirectParDo()); err != nil {
- t.Error(err)
- }
-}
-
-func TestEmitParDo(t *testing.T) {
- if err := ptest.Run(EmitParDo()); err != nil {
- t.Error(err)
- }
-}
-
-func TestMultiEmitParDo(t *testing.T) {
- if err := ptest.Run(MultiEmitParDo()); err != nil {
- t.Error(err)
- }
-}
-
-func TestMixedOutputParDo(t *testing.T) {
- if err := ptest.Run(MixedOutputParDo()); err != nil {
- t.Error(err)
- }
-}
-
-func TestDirectParDoAfterGBK(t *testing.T) {
- if err := ptest.Run(DirectParDoAfterGBK()); err != nil {
- t.Error(err)
- }
-}
-
-func TestEmitParDoAfterGBK(t *testing.T) {
- if err := ptest.Run(EmitParDoAfterGBK()); err != nil {
- t.Error(err)
- }
+// TestMain invokes ptest.Main to allow running these tests on
+// non-direct runners.
+func TestMain(m *testing.M) {
+ ptest.Main(m)
}
diff --git a/sdks/go/test/run_validatesrunner_tests.sh
b/sdks/go/test/run_validatesrunner_tests.sh
index a43898e..00ec453 100755
--- a/sdks/go/test/run_validatesrunner_tests.sh
+++ b/sdks/go/test/run_validatesrunner_tests.sh
@@ -310,11 +310,11 @@ if [[ "$JENKINS" == true ]]; then
cd ./src
echo ">>> RUNNING $RUNNER VALIDATESRUNNER TESTS"
- GOPATH=$TEMP_GOPATH go test -v
github.com/apache/beam/sdks/go/test/integration/... $ARGS \
+ GOPATH=$TEMP_GOPATH go test -v
github.com/apache/beam/sdks/go/test/integration/...
github.com/apache/beam/sdks/go/test/regression $ARGS \
|| TEST_EXIT_CODE=$? # don't fail fast here; clean up environment before
exiting
else
echo ">>> RUNNING $RUNNER VALIDATESRUNNER TESTS"
- go test -v ./sdks/go/test/integration/... $ARGS \
+ go test -v ./sdks/go/test/integration/... ./sdks/go/test/regression $ARGS \
|| TEST_EXIT_CODE=$? # don't fail fast here; clean up environment before
exiting
fi