damondouglas commented on code in PR #30492:
URL: https://github.com/apache/beam/pull/30492#discussion_r1618025133
##########
sdks/go/pkg/beam/runners/prism/internal/stage.go:
##########
@@ -55,15 +55,16 @@ type link struct {
// account, but all serialization boundaries remain since the pcollections
// would continue to get serialized.
type stage struct {
- ID string
- transforms []string
- primaryInput string // PCollection used as the parallel input.
- outputs []link // PCollections that must escape this
stage.
- sideInputs []engine.LinkID // Non-parallel input PCollections and
their consumers
- internalCols []string // PCollections that escape. Used for
precise coder sending.
- envID string
- stateful bool
- hasTimers []string
+ ID string
+ transforms []string
+ primaryInput string // PCollection used as the
parallel input.
+ outputs []link // PCollections that must escape
this stage.
+ sideInputs []engine.LinkID // Non-parallel input PCollections
and their consumers
+ internalCols []string // PCollections that escape. Used
for precise coder sending.
+ envID string
+ stateful bool
+ hasTimers []string
+ processingTimeTimers map[string]bool
Review Comment:
Is
https://github.com/lostluck/beam/blob/beam30083ProcessingTimme/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go#L843
the reason why we only have a map of processing time timers and not event time
timers?
##########
sdks/go/pkg/beam/runners/prism/internal/engine/engine_test.go:
##########
@@ -221,3 +221,46 @@ func TestTestStream(t *testing.T) {
}
}
}
+
+// TestProcessingTime is the suite for validating behaviors around
ProcessingTime.
+// Separate from the TestStream, Timers, and Triggers tests due to the unique
nature
+// of the time domain.
+func TestProcessingTime(t *testing.T) {
+ initRunner(t)
+
+ tests := []struct {
+ pipeline func(s beam.Scope)
+ }{
+ {pipeline: primitives.TimersProcessingTimeTestStream_Infinity},
+ {pipeline: primitives.TimersProcessingTime_Bounded},
+ {pipeline: primitives.TimersProcessingTime_Unbounded},
+ }
+
+ configs := []struct {
+ name string
+ OneElementPerKey, OneKeyPerBundle bool
+ }{
+ {"Greedy", false, false},
+ {"AllElementsPerKey", false, true},
+ {"OneElementPerKey", true, false},
+ // {"OneElementPerBundle", true, true}, // Reveals flaky
behavior
Review Comment:
Not PR blocking and not sure if this might be problematic for the
AllElementsPerKey and OneElementPerKey cases in the future. I commented out
this flakey OneElementPerBundle case and inserted log statements after
https://github.com/lostluck/beam/blob/beam30083ProcessingTimme/sdks/go/test/integration/primitives/timers.go#L197
and after
https://github.com/lostluck/beam/blob/beam30083ProcessingTimme/sdks/go/test/integration/primitives/timers.go#L210.
I observed that the key associated with the panic here:
https://github.com/lostluck/beam/blob/beam30083ProcessingTimme/sdks/go/test/integration/primitives/timers.go#L213
never appeared in the aforementioned logged steps. I haven't yet figured out
why this is but wanted to relay my findings.
##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -305,8 +312,13 @@ func (em *ElementManager) Bundles(ctx context.Context,
nextBundID func() string)
for {
em.refreshCond.L.Lock()
+ // Check if processing time has advanced before the
wait loop.
+ emNow := em.ProcessingTimeNow()
+ ptRefreshed := em.processTimeEvents.AdvanceTo(emNow)
+ em.watermarkRefreshes.merge(ptRefreshed)
+
// If there are no watermark refreshes available, we
wait until there are.
- for len(em.watermarkRefreshes) == 0 {
+ for len(em.watermarkRefreshes)+len(ptRefreshed) == 0 {
// TODO Add processing time event condition instead of piggybacking on
watermarks?
Review Comment:
Non PR blocking but curious what "processing time event condition" means and
especially with respect to the "instead of" in the TODO statement.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]