shunping commented on code in PR #38523:
URL: https://github.com/apache/beam/pull/38523#discussion_r3278085109
##########
sdks/go/pkg/beam/runners/prism/internal/testdofns_test.go:
##########
@@ -404,3 +405,37 @@ func (fn *selfCheckpointingDoFn) ProcessElement(rt
*sdf.LockRTracker, _ []byte,
}
}
}
+
+type errorSplitTracker struct {
+ *offsetrange.Tracker
+}
+
+func (t *errorSplitTracker) TrySplit(fraction float64) (any, any, error) {
+ return nil, nil, fmt.Errorf("intentional split error from tracker")
+}
+
+type slowFailSDF struct{}
+
+func (fn *slowFailSDF) CreateInitialRestriction(config SourceConfig)
offsetrange.Restriction {
+ return offsetrange.Restriction{Start: 0, End: config.NumElements}
+}
+
+func (fn *slowFailSDF) SplitRestriction(config SourceConfig, rest
offsetrange.Restriction) []offsetrange.Restriction {
+ return rest.EvenSplits(config.InitialSplits)
+}
+
+func (fn *slowFailSDF) RestrictionSize(_ SourceConfig, rest
offsetrange.Restriction) float64 {
+ return rest.Size()
+}
+
+func (fn *slowFailSDF) CreateTracker(rest offsetrange.Restriction)
*sdf.LockRTracker {
+ return sdf.NewLockRTracker(&errorSplitTracker{Tracker:
offsetrange.NewTracker(rest)})
+}
+
+func (fn *slowFailSDF) ProcessElement(ctx context.Context, rt
*sdf.LockRTracker, config SourceConfig, emit func(int64)) error {
+ fmt.Println("DEBUG: slowFailSDF.ProcessElement invoked")
Review Comment:
Fixed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]