This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 8944cb9 [BEAM-13836] Fix the answers placeholders locations in the
Python katas
new 0148d30 Merge pull request #16820 from [BEAM-13836] Fix the answers
placeholders locations in the Python katas
8944cb9 is described below
commit 8944cb93f1e0f69c9679f396d6ea85597556e052
Author: Israel Herraiz <[email protected]>
AuthorDate: Tue Feb 1 20:00:00 2022 +0100
[BEAM-13836] Fix the answers placeholders locations in the Python katas
---
.../Aggregation/Count/task-info.yaml | 2 +-
.../Aggregation/Largest/task-info.yaml | 2 +-
.../Aggregation/Mean/task-info.yaml | 2 +-
.../Aggregation/Smallest/task-info.yaml | 2 +-
.../Aggregation/Sum/task-info.yaml | 2 +-
.../Common Transforms/Filter/Filter/task-info.yaml | 2 +-
.../Common Transforms/Filter/ParDo/task-info.yaml | 7 +-
.../WithKeys/WithKeys/task-info.yaml | 2 +-
.../Common Transforms/WithKeys/WithKeys/task.py | 2 +-
.../Branching/Branching/task-info.yaml | 4 +-
.../CoGroupByKey/CoGroupByKey/task-info.yaml | 4 +-
.../Combine/Combine PerKey/task-info.yaml | 2 +-
.../Combine/CombineFn/task-info.yaml | 6 +-
.../Combine/Simple Function/task-info.yaml | 4 +-
.../Composite Transform/task-info.yaml | 6 +-
.../Core Transforms/Flatten/Flatten/task-info.yaml | 4 +-
.../GroupByKey/GroupByKey/task-info.yaml | 6 +-
.../Core Transforms/Map/FlatMap/task-info.yaml | 2 +-
.../python/Core Transforms/Map/Map/task-info.yaml | 2 +-
.../Map/ParDo OneToMany/task-info.yaml | 6 +-
.../Core Transforms/Map/ParDo/task-info.yaml | 6 +-
.../Partition/Partition/task-info.yaml | 6 +-
.../Side Input/Side Input/task-info.yaml | 6 +-
.../Core Transforms/Side Input/Side Input/task.py | 8 --
.../Side Output/Side Output/task-info.yaml | 6 +-
.../Examples/Word Count/Word Count/task-info.yaml | 4 +-
.../python/IO/TextIO/ReadFromText/task-info.yaml | 4 +-
.../katas/python/IO/TextIO/ReadFromText/task.py | 8 ++
.../Hello Beam/Hello Beam/task-info.yaml | 4 +-
.../Early Triggers/Early Triggers/task-info.yaml | 6 +-
.../Triggers/Early Triggers/Early Triggers/task.py | 34 ++++----
.../Early Triggers/Early Triggers/tests.py | 90 ++++++++--------------
.../Event Time Triggers/task-info.yaml | 4 +-
.../Event Time Triggers/task.py | 41 +++++-----
.../Event Time Triggers/tests.py | 76 ++++--------------
.../Window Accumulation Mode/task-info.yaml | 6 +-
.../Window Accumulation Mode/task.py | 31 ++++----
.../Window Accumulation Mode/tests.py | 88 +++++++--------------
.../Adding Timestamp/ParDo/task-info.yaml | 8 +-
.../Fixed Time Window/task-info.yaml | 2 +-
learning/katas/python/requirements.txt | 6 +-
41 files changed, 209 insertions(+), 304 deletions(-)
diff --git a/learning/katas/python/Common
Transforms/Aggregation/Count/task-info.yaml b/learning/katas/python/Common
Transforms/Aggregation/Count/task-info.yaml
index 0681008..99a6d4f 100644
--- a/learning/katas/python/Common Transforms/Aggregation/Count/task-info.yaml
+++ b/learning/katas/python/Common Transforms/Aggregation/Count/task-info.yaml
@@ -22,7 +22,7 @@ files:
- name: task.py
visible: true
placeholders:
- - offset: 945
+ - offset: 1134
length: 31
placeholder_text: TODO()
- name: tests.py
diff --git a/learning/katas/python/Common
Transforms/Aggregation/Largest/task-info.yaml b/learning/katas/python/Common
Transforms/Aggregation/Largest/task-info.yaml
index 9b00391..0268222 100644
--- a/learning/katas/python/Common
Transforms/Aggregation/Largest/task-info.yaml
+++ b/learning/katas/python/Common
Transforms/Aggregation/Largest/task-info.yaml
@@ -22,7 +22,7 @@ files:
- name: task.py
visible: true
placeholders:
- - offset: 945
+ - offset: 1150
length: 29
placeholder_text: TODO()
- name: tests.py
diff --git a/learning/katas/python/Common
Transforms/Aggregation/Mean/task-info.yaml b/learning/katas/python/Common
Transforms/Aggregation/Mean/task-info.yaml
index 22c5db3..b401345 100644
--- a/learning/katas/python/Common Transforms/Aggregation/Mean/task-info.yaml
+++ b/learning/katas/python/Common Transforms/Aggregation/Mean/task-info.yaml
@@ -22,7 +22,7 @@ files:
- name: task.py
visible: true
placeholders:
- - offset: 945
+ - offset: 1156
length: 30
placeholder_text: TODO()
- name: tests.py
diff --git a/learning/katas/python/Common
Transforms/Aggregation/Smallest/task-info.yaml b/learning/katas/python/Common
Transforms/Aggregation/Smallest/task-info.yaml
index 22c5db3..44afe84 100644
--- a/learning/katas/python/Common
Transforms/Aggregation/Smallest/task-info.yaml
+++ b/learning/katas/python/Common
Transforms/Aggregation/Smallest/task-info.yaml
@@ -22,7 +22,7 @@ files:
- name: task.py
visible: true
placeholders:
- - offset: 945
+ - offset: 1141
length: 30
placeholder_text: TODO()
- name: tests.py
diff --git a/learning/katas/python/Common
Transforms/Aggregation/Sum/task-info.yaml b/learning/katas/python/Common
Transforms/Aggregation/Sum/task-info.yaml
index 31213b8..d909a48 100644
--- a/learning/katas/python/Common Transforms/Aggregation/Sum/task-info.yaml
+++ b/learning/katas/python/Common Transforms/Aggregation/Sum/task-info.yaml
@@ -22,7 +22,7 @@ files:
- name: task.py
visible: true
placeholders:
- - offset: 945
+ - offset: 1135
length: 25
placeholder_text: TODO()
- name: tests.py
diff --git a/learning/katas/python/Common
Transforms/Filter/Filter/task-info.yaml b/learning/katas/python/Common
Transforms/Filter/Filter/task-info.yaml
index 78b7c1d..b2cdd32 100644
--- a/learning/katas/python/Common Transforms/Filter/Filter/task-info.yaml
+++ b/learning/katas/python/Common Transforms/Filter/Filter/task-info.yaml
@@ -22,7 +22,7 @@ files:
- name: task.py
visible: true
placeholders:
- - offset: 945
+ - offset: 1152
length: 37
placeholder_text: TODO()
- name: tests.py
diff --git a/learning/katas/python/Common
Transforms/Filter/ParDo/task-info.yaml b/learning/katas/python/Common
Transforms/Filter/ParDo/task-info.yaml
index aff611a..c63886d 100644
--- a/learning/katas/python/Common Transforms/Filter/ParDo/task-info.yaml
+++ b/learning/katas/python/Common Transforms/Filter/ParDo/task-info.yaml
@@ -22,8 +22,11 @@ files:
- name: task.py
visible: true
placeholders:
- - offset: 921
- length: 82
+ - offset: 1290
+ length: 33
+ placeholder_text: TODO()
+ - offset: 1133
+ length: 87
placeholder_text: TODO()
- name: tests.py
visible: false
diff --git a/learning/katas/python/Common
Transforms/WithKeys/WithKeys/task-info.yaml b/learning/katas/python/Common
Transforms/WithKeys/WithKeys/task-info.yaml
index 7708d24..c26ccfa 100644
--- a/learning/katas/python/Common Transforms/WithKeys/WithKeys/task-info.yaml
+++ b/learning/katas/python/Common Transforms/WithKeys/WithKeys/task-info.yaml
@@ -22,7 +22,7 @@ files:
- name: task.py
visible: true
placeholders:
- - offset: 977
+ - offset: 1200
length: 37
placeholder_text: TODO()
- name: tests.py
diff --git a/learning/katas/python/Common Transforms/WithKeys/WithKeys/task.py
b/learning/katas/python/Common Transforms/WithKeys/WithKeys/task.py
index 41ab498..27a0cc5 100644
--- a/learning/katas/python/Common Transforms/WithKeys/WithKeys/task.py
+++ b/learning/katas/python/Common Transforms/WithKeys/WithKeys/task.py
@@ -20,7 +20,7 @@
# multifile: false
# context_line: 29
# categories:
-# - Combiners
+# - Core Transforms
import apache_beam as beam
diff --git a/learning/katas/python/Core
Transforms/Branching/Branching/task-info.yaml b/learning/katas/python/Core
Transforms/Branching/Branching/task-info.yaml
index 13d0ab0..857ca3b 100644
--- a/learning/katas/python/Core Transforms/Branching/Branching/task-info.yaml
+++ b/learning/katas/python/Core Transforms/Branching/Branching/task-info.yaml
@@ -22,10 +22,10 @@ files:
- name: task.py
visible: true
placeholders:
- - offset: 956
+ - offset: 1295
length: 39
placeholder_text: TODO()
- - offset: 1015
+ - offset: 1354
length: 40
placeholder_text: TODO()
- name: tests.py
diff --git a/learning/katas/python/Core
Transforms/CoGroupByKey/CoGroupByKey/task-info.yaml
b/learning/katas/python/Core Transforms/CoGroupByKey/CoGroupByKey/task-info.yaml
index 3e192e2..52a393d 100644
--- a/learning/katas/python/Core
Transforms/CoGroupByKey/CoGroupByKey/task-info.yaml
+++ b/learning/katas/python/Core
Transforms/CoGroupByKey/CoGroupByKey/task-info.yaml
@@ -22,8 +22,8 @@ files:
- name: task.py
visible: true
placeholders:
- - offset: 1228
- length: 541
+ - offset: 1542
+ length: 545
placeholder_text: TODO()
- name: tests.py
visible: false
diff --git a/learning/katas/python/Core Transforms/Combine/Combine
PerKey/task-info.yaml b/learning/katas/python/Core Transforms/Combine/Combine
PerKey/task-info.yaml
index 5025294..a5bd8f8 100644
--- a/learning/katas/python/Core Transforms/Combine/Combine
PerKey/task-info.yaml
+++ b/learning/katas/python/Core Transforms/Combine/Combine
PerKey/task-info.yaml
@@ -22,7 +22,7 @@ files:
- name: task.py
visible: true
placeholders:
- - offset: 1101
+ - offset: 1303
length: 23
placeholder_text: TODO()
- name: tests.py
diff --git a/learning/katas/python/Core
Transforms/Combine/CombineFn/task-info.yaml b/learning/katas/python/Core
Transforms/Combine/CombineFn/task-info.yaml
index 75c8d17..8a8dafa 100644
--- a/learning/katas/python/Core Transforms/Combine/CombineFn/task-info.yaml
+++ b/learning/katas/python/Core Transforms/Combine/CombineFn/task-info.yaml
@@ -22,10 +22,10 @@ files:
- name: task.py
visible: true
placeholders:
- - offset: 916
- length: 436
+ - offset: 1071
+ length: 441
placeholder_text: TODO()
- - offset: 1431
+ - offset: 1591
length: 33
placeholder_text: TODO()
- name: tests.py
diff --git a/learning/katas/python/Core Transforms/Combine/Simple
Function/task-info.yaml b/learning/katas/python/Core Transforms/Combine/Simple
Function/task-info.yaml
index a3f9c3f..0800d5e 100644
--- a/learning/katas/python/Core Transforms/Combine/Simple
Function/task-info.yaml
+++ b/learning/katas/python/Core Transforms/Combine/Simple
Function/task-info.yaml
@@ -22,10 +22,10 @@ files:
- name: task.py
visible: true
placeholders:
- - offset: 900
+ - offset: 1100
length: 73
placeholder_text: TODO()
- - offset: 1047
+ - offset: 1247
length: 25
placeholder_text: TODO()
- name: tests.py
diff --git a/learning/katas/python/Core Transforms/Composite
Transform/Composite Transform/task-info.yaml b/learning/katas/python/Core
Transforms/Composite Transform/Composite Transform/task-info.yaml
index aae12cd..9a9cd4f 100644
--- a/learning/katas/python/Core Transforms/Composite Transform/Composite
Transform/task-info.yaml
+++ b/learning/katas/python/Core Transforms/Composite Transform/Composite
Transform/task-info.yaml
@@ -22,10 +22,10 @@ files:
- name: task.py
visible: true
placeholders:
- - offset: 920
- length: 184
+ - offset: 1227
+ length: 189
placeholder_text: TODO()
- - offset: 1190
+ - offset: 1502
length: 27
placeholder_text: TODO()
- name: tests.py
diff --git a/learning/katas/python/Core
Transforms/Flatten/Flatten/task-info.yaml b/learning/katas/python/Core
Transforms/Flatten/Flatten/task-info.yaml
index c9a5071..9103c3e 100644
--- a/learning/katas/python/Core Transforms/Flatten/Flatten/task-info.yaml
+++ b/learning/katas/python/Core Transforms/Flatten/Flatten/task-info.yaml
@@ -22,8 +22,8 @@ files:
- name: task.py
visible: true
placeholders:
- - offset: 1159
- length: 14
+ - offset: 1320
+ length: 63
placeholder_text: TODO()
- name: tests.py
visible: false
diff --git a/learning/katas/python/Core
Transforms/GroupByKey/GroupByKey/task-info.yaml b/learning/katas/python/Core
Transforms/GroupByKey/GroupByKey/task-info.yaml
index 98eb868..a19a82c 100644
--- a/learning/katas/python/Core
Transforms/GroupByKey/GroupByKey/task-info.yaml
+++ b/learning/katas/python/Core
Transforms/GroupByKey/GroupByKey/task-info.yaml
@@ -22,8 +22,8 @@ files:
- name: task.py
visible: true
placeholders:
- - offset: 981
- length: 65
- placeholder_text: '| TODO()'
+ - offset: 1172
+ length: 63
+ placeholder_text: TODO()
- name: tests.py
visible: false
diff --git a/learning/katas/python/Core Transforms/Map/FlatMap/task-info.yaml
b/learning/katas/python/Core Transforms/Map/FlatMap/task-info.yaml
index 1e50818..8adaec4 100644
--- a/learning/katas/python/Core Transforms/Map/FlatMap/task-info.yaml
+++ b/learning/katas/python/Core Transforms/Map/FlatMap/task-info.yaml
@@ -22,7 +22,7 @@ files:
- name: task.py
visible: true
placeholders:
- - offset: 979
+ - offset: 1230
length: 47
placeholder_text: TODO()
- name: tests.py
diff --git a/learning/katas/python/Core Transforms/Map/Map/task-info.yaml
b/learning/katas/python/Core Transforms/Map/Map/task-info.yaml
index d1c5cf1..a442f52 100644
--- a/learning/katas/python/Core Transforms/Map/Map/task-info.yaml
+++ b/learning/katas/python/Core Transforms/Map/Map/task-info.yaml
@@ -22,7 +22,7 @@ files:
- name: task.py
visible: true
placeholders:
- - offset: 953
+ - offset: 1178
length: 29
placeholder_text: TODO()
- name: tests.py
diff --git a/learning/katas/python/Core Transforms/Map/ParDo
OneToMany/task-info.yaml b/learning/katas/python/Core Transforms/Map/ParDo
OneToMany/task-info.yaml
index c52cd63..1eb6240 100644
--- a/learning/katas/python/Core Transforms/Map/ParDo OneToMany/task-info.yaml
+++ b/learning/katas/python/Core Transforms/Map/ParDo OneToMany/task-info.yaml
@@ -22,10 +22,10 @@ files:
- name: task.py
visible: true
placeholders:
- - offset: 920
- length: 58
+ - offset: 1166
+ length: 63
placeholder_text: TODO()
- - offset: 1068
+ - offset: 1319
length: 32
placeholder_text: TODO()
- name: tests.py
diff --git a/learning/katas/python/Core Transforms/Map/ParDo/task-info.yaml
b/learning/katas/python/Core Transforms/Map/ParDo/task-info.yaml
index a2c9191..c5353ee 100644
--- a/learning/katas/python/Core Transforms/Map/ParDo/task-info.yaml
+++ b/learning/katas/python/Core Transforms/Map/ParDo/task-info.yaml
@@ -22,10 +22,10 @@ files:
- name: task.py
visible: true
placeholders:
- - offset: 919
- length: 54
+ - offset: 1137
+ length: 59
placeholder_text: TODO()
- - offset: 1047
+ - offset: 1270
length: 31
placeholder_text: TODO()
- name: tests.py
diff --git a/learning/katas/python/Core
Transforms/Partition/Partition/task-info.yaml b/learning/katas/python/Core
Transforms/Partition/Partition/task-info.yaml
index c63ee3c..a12f21f 100644
--- a/learning/katas/python/Core Transforms/Partition/Partition/task-info.yaml
+++ b/learning/katas/python/Core Transforms/Partition/Partition/task-info.yaml
@@ -22,11 +22,11 @@ files:
- name: task.py
visible: true
placeholders:
- - offset: 924
+ - offset: 1262
length: 60
placeholder_text: TODO()
- - offset: 1100
- length: 31
+ - offset: 1437
+ length: 33
placeholder_text: TODO()
- name: tests.py
visible: false
diff --git a/learning/katas/python/Core Transforms/Side Input/Side
Input/task-info.yaml b/learning/katas/python/Core Transforms/Side Input/Side
Input/task-info.yaml
index be86fee..b6cbaf7 100644
--- a/learning/katas/python/Core Transforms/Side Input/Side
Input/task-info.yaml
+++ b/learning/katas/python/Core Transforms/Side Input/Side
Input/task-info.yaml
@@ -22,10 +22,10 @@ files:
- name: task.py
visible: true
placeholders:
- - offset: 1534
- length: 153
+ - offset: 1378
+ length: 158
placeholder_text: TODO()
- - offset: 2135
+ - offset: 1984
length: 52
placeholder_text: TODO()
- name: tests.py
diff --git a/learning/katas/python/Core Transforms/Side Input/Side
Input/task.py b/learning/katas/python/Core Transforms/Side Input/Side
Input/task.py
index 39a3f6e3..8b79c9c 100644
--- a/learning/katas/python/Core Transforms/Side Input/Side Input/task.py
+++ b/learning/katas/python/Core Transforms/Side Input/Side Input/task.py
@@ -13,14 +13,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
# beam-playground:
# name: SideInput
diff --git a/learning/katas/python/Core Transforms/Side Output/Side
Output/task-info.yaml b/learning/katas/python/Core Transforms/Side Output/Side
Output/task-info.yaml
index 025f105..16f5aef 100644
--- a/learning/katas/python/Core Transforms/Side Output/Side
Output/task-info.yaml
+++ b/learning/katas/python/Core Transforms/Side Output/Side
Output/task-info.yaml
@@ -22,10 +22,10 @@ files:
- name: task.py
visible: true
placeholders:
- - offset: 1011
- length: 160
+ - offset: 1255
+ length: 165
placeholder_text: TODO()
- - offset: 1277
+ - offset: 1526
length: 100
placeholder_text: TODO()
- name: tests.py
diff --git a/learning/katas/python/Examples/Word Count/Word
Count/task-info.yaml b/learning/katas/python/Examples/Word Count/Word
Count/task-info.yaml
index 6370180..6cc94cb 100644
--- a/learning/katas/python/Examples/Word Count/Word Count/task-info.yaml
+++ b/learning/katas/python/Examples/Word Count/Word Count/task-info.yaml
@@ -22,8 +22,8 @@ files:
- name: task.py
visible: true
placeholders:
- - offset: 1032
- length: 140
+ - offset: 1228
+ length: 147
placeholder_text: TODO()
- name: tests.py
visible: false
diff --git a/learning/katas/python/IO/TextIO/ReadFromText/task-info.yaml
b/learning/katas/python/IO/TextIO/ReadFromText/task-info.yaml
index 6a322af..c89ce4e 100644
--- a/learning/katas/python/IO/TextIO/ReadFromText/task-info.yaml
+++ b/learning/katas/python/IO/TextIO/ReadFromText/task-info.yaml
@@ -22,10 +22,10 @@ files:
- name: task.py
visible: true
placeholders:
- - offset: 930
+ - offset: 1100
length: 31
placeholder_text: TODO()
- - offset: 969
+ - offset: 1139
length: 41
placeholder_text: TODO()
- name: tests.py
diff --git a/learning/katas/python/IO/TextIO/ReadFromText/task.py
b/learning/katas/python/IO/TextIO/ReadFromText/task.py
index ab04e1d..e3e8def 100644
--- a/learning/katas/python/IO/TextIO/ReadFromText/task.py
+++ b/learning/katas/python/IO/TextIO/ReadFromText/task.py
@@ -14,6 +14,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+# beam-playground:
+# name: ReadFromText
+# description: Task from katas to read from text files.
+# multifile: false
+# context_line: 29
+# categories:
+# - IO
+
import apache_beam as beam
from log_elements import LogElements
diff --git a/learning/katas/python/Introduction/Hello Beam/Hello
Beam/task-info.yaml b/learning/katas/python/Introduction/Hello Beam/Hello
Beam/task-info.yaml
index 8f9d26b..2e22dce 100644
--- a/learning/katas/python/Introduction/Hello Beam/Hello Beam/task-info.yaml
+++ b/learning/katas/python/Introduction/Hello Beam/Hello Beam/task-info.yaml
@@ -22,8 +22,8 @@ files:
- name: task.py
visible: true
placeholders:
- - offset: 912
- length: 27
+ - offset: 1157
+ length: 26
placeholder_text: TODO()
- name: tests.py
visible: false
diff --git a/learning/katas/python/Triggers/Early Triggers/Early
Triggers/task-info.yaml b/learning/katas/python/Triggers/Early Triggers/Early
Triggers/task-info.yaml
index 9134934..697a581 100644
--- a/learning/katas/python/Triggers/Early Triggers/Early
Triggers/task-info.yaml
+++ b/learning/katas/python/Triggers/Early Triggers/Early
Triggers/task-info.yaml
@@ -22,9 +22,9 @@ files:
- name: task.py
visible: true
placeholders:
- - offset: 1342
- length: 387
- placeholder_text: TODO()
+ - offset: 1545
+ length: 431
+ placeholder_text: TODO()
- name: tests.py
visible: false
- name: generate_event.py
diff --git a/learning/katas/python/Triggers/Early Triggers/Early
Triggers/task.py b/learning/katas/python/Triggers/Early Triggers/Early
Triggers/task.py
index 2d30275..0c27428 100644
--- a/learning/katas/python/Triggers/Early Triggers/Early Triggers/task.py
+++ b/learning/katas/python/Triggers/Early Triggers/Early Triggers/task.py
@@ -17,35 +17,41 @@
# under the License.
#
+# beam-playground:
+# name: EarlyTriggers
+# description: Task from katas to count events using early triggers
+# multifile: true
+# context_line: 46
+# categories:
+# - Streaming
+
import apache_beam as beam
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import StandardOptions
+
from generate_event import GenerateEvent
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.trigger import AfterWatermark
from apache_beam.transforms.trigger import AfterCount
from apache_beam.transforms.trigger import AccumulationMode
from apache_beam.utils.timestamp import Duration
-from apache_beam.options.pipeline_options import PipelineOptions
-from apache_beam.options.pipeline_options import StandardOptions
from log_elements import LogElements
-def apply_transform(events):
+class CountEventsWithEarlyTrigger(beam.PTransform):
+ def expand(self, events):
return (events
- | beam.WindowInto(FixedWindows(1*24*60*60), # 1 Day Window
+ | beam.WindowInto(FixedWindows(1 * 24 * 60 * 60), # 1 Day Window
trigger=AfterWatermark(early=AfterCount(1)),
accumulation_mode=AccumulationMode.DISCARDING,
allowed_lateness=Duration(seconds=0))
|
beam.CombineGlobally(beam.combiners.CountCombineFn()).without_defaults())
-def main():
- options = PipelineOptions()
- options.view_as(StandardOptions).streaming = True
- with beam.Pipeline(options=options) as p:
- events = p | GenerateEvent.sample_data()
- output = apply_transform(events)
- output | LogElements(with_window=True)
-
+options = PipelineOptions()
+options.view_as(StandardOptions).streaming = True # Required to get multiple
trigger firing outputs
-if __name__ == "__main__":
- main()
+with beam.Pipeline(options=options) as p:
+ (p | GenerateEvent.sample_data()
+ | CountEventsWithEarlyTrigger()
+ | LogElements(with_window=True))
diff --git a/learning/katas/python/Triggers/Early Triggers/Early
Triggers/tests.py b/learning/katas/python/Triggers/Early Triggers/Early
Triggers/tests.py
index 0ff32af..238db70 100644
--- a/learning/katas/python/Triggers/Early Triggers/Early Triggers/tests.py
+++ b/learning/katas/python/Triggers/Early Triggers/Early Triggers/tests.py
@@ -13,70 +13,42 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
-import pytz
from datetime import datetime
-from task import apply_transform
-from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.testing.test_stream import TestStream
-from apache_beam.transforms import window
-from apache_beam.options.pipeline_options import PipelineOptions
-from apache_beam.options.pipeline_options import StandardOptions
-from apache_beam.testing.util import assert_that, equal_to, equal_to_per_window
+
from test_helper import failed, passed, get_file_output, test_is_not_empty
def test_output():
- options = PipelineOptions()
- options.view_as(StandardOptions).streaming = True
- test_pipeline = TestPipeline(options=options)
-
- events = ( test_pipeline
- | TestStream()
- .add_elements(elements=["event"],
event_timestamp=datetime(2021, 3, 1, 0, 0, 1, 0, tzinfo=pytz.UTC).timestamp())
- .add_elements(elements=["event"],
event_timestamp=datetime(2021, 3, 1, 0, 0, 2, 0, tzinfo=pytz.UTC).timestamp())
- .add_elements(elements=["event"],
event_timestamp=datetime(2021, 3, 1, 0, 0, 3, 0, tzinfo=pytz.UTC).timestamp())
- .add_elements(elements=["event"],
event_timestamp=datetime(2021, 3, 1, 0, 0, 4, 0, tzinfo=pytz.UTC).timestamp())
- .advance_watermark_to(datetime(2021, 3, 1, 0, 0, 5, 0,
tzinfo=pytz.UTC).timestamp())
- .add_elements(elements=["event"],
event_timestamp=datetime(2021, 3, 1, 0, 0, 5, 0, tzinfo=pytz.UTC).timestamp())
- .add_elements(elements=["event"],
event_timestamp=datetime(2021, 3, 1, 0, 0, 6, 0, tzinfo=pytz.UTC).timestamp())
- .add_elements(elements=["event"],
event_timestamp=datetime(2021, 3, 1, 0, 0, 7, 0, tzinfo=pytz.UTC).timestamp())
- .add_elements(elements=["event"],
event_timestamp=datetime(2021, 3, 1, 0, 0, 8, 0, tzinfo=pytz.UTC).timestamp())
- .add_elements(elements=["event"],
event_timestamp=datetime(2021, 3, 1, 0, 0, 9, 0, tzinfo=pytz.UTC).timestamp())
- .advance_watermark_to(datetime(2021, 3, 1, 0, 0, 10, 0,
tzinfo=pytz.UTC).timestamp())
- .add_elements(elements=["event"],
event_timestamp=datetime(2021, 3, 1, 0, 0, 10, 0, tzinfo=pytz.UTC).timestamp())
- .add_elements(elements=["event"],
event_timestamp=datetime(2021, 3, 1, 0, 0, 11, 0, tzinfo=pytz.UTC).timestamp())
- .add_elements(elements=["event"],
event_timestamp=datetime(2021, 3, 1, 0, 0, 12, 0, tzinfo=pytz.UTC).timestamp())
- .add_elements(elements=["event"],
event_timestamp=datetime(2021, 3, 1, 0, 0, 13, 0, tzinfo=pytz.UTC).timestamp())
- .add_elements(elements=["event"],
event_timestamp=datetime(2021, 3, 1, 0, 0, 14, 0, tzinfo=pytz.UTC).timestamp())
- .advance_watermark_to(datetime(2021, 3, 1, 0, 0, 15, 0,
tzinfo=pytz.UTC).timestamp())
- .add_elements(elements=["event"],
event_timestamp=datetime(2021, 3, 1, 0, 0, 15, 0, tzinfo=pytz.UTC).timestamp())
- .add_elements(elements=["event"],
event_timestamp=datetime(2021, 3, 1, 0, 0, 16, 0, tzinfo=pytz.UTC).timestamp())
- .add_elements(elements=["event"],
event_timestamp=datetime(2021, 3, 1, 0, 0, 17, 0, tzinfo=pytz.UTC).timestamp())
- .add_elements(elements=["event"],
event_timestamp=datetime(2021, 3, 1, 0, 0, 18, 0, tzinfo=pytz.UTC).timestamp())
- .add_elements(elements=["event"],
event_timestamp=datetime(2021, 3, 1, 0, 0, 19, 0, tzinfo=pytz.UTC).timestamp())
- .advance_watermark_to(datetime(2021, 3, 1, 0, 0, 20, 0,
tzinfo=pytz.UTC).timestamp())
- .add_elements(elements=["event"],
event_timestamp=datetime(2021, 3, 1, 0, 0, 20, 0, tzinfo=pytz.UTC).timestamp())
- .advance_watermark_to(datetime(2021, 3, 1, 0, 0, 25, 0,
tzinfo=pytz.UTC).timestamp())
- .advance_watermark_to_infinity())
-
- results = apply_transform(events)
-
- answers = {
- window.IntervalWindow(datetime(2021, 3, 1, 0, 0, 0, 0,
tzinfo=pytz.UTC).timestamp(),
- datetime(2021, 3, 2, 0, 0, 0, 0,
tzinfo=pytz.UTC).timestamp()): [1, 1, 1, 1, 1,
-
1, 1, 1, 1, 1,
-
1, 1, 1, 1, 1,
-
1, 1, 1, 1, 1, 0],
- }
-
- assert_that(results,
- equal_to_per_window(answers),
- label='count assert per window')
-
- test_pipeline.run()
+ answers = ["1, window(start=2021-03-01T00:00:00Z, end=2021-03-02T00:00:00Z)",
+ "1, window(start=2021-03-01T00:00:00Z, end=2021-03-02T00:00:00Z)",
+ "1, window(start=2021-03-01T00:00:00Z, end=2021-03-02T00:00:00Z)",
+ "1, window(start=2021-03-01T00:00:00Z, end=2021-03-02T00:00:00Z)",
+ "1, window(start=2021-03-01T00:00:00Z, end=2021-03-02T00:00:00Z)",
+ "1, window(start=2021-03-01T00:00:00Z, end=2021-03-02T00:00:00Z)",
+ "1, window(start=2021-03-01T00:00:00Z, end=2021-03-02T00:00:00Z)",
+ "1, window(start=2021-03-01T00:00:00Z, end=2021-03-02T00:00:00Z)",
+ "1, window(start=2021-03-01T00:00:00Z, end=2021-03-02T00:00:00Z)",
+ "1, window(start=2021-03-01T00:00:00Z, end=2021-03-02T00:00:00Z)",
+ "1, window(start=2021-03-01T00:00:00Z, end=2021-03-02T00:00:00Z)",
+ "1, window(start=2021-03-01T00:00:00Z, end=2021-03-02T00:00:00Z)",
+ "1, window(start=2021-03-01T00:00:00Z, end=2021-03-02T00:00:00Z)",
+ "1, window(start=2021-03-01T00:00:00Z, end=2021-03-02T00:00:00Z)",
+ "1, window(start=2021-03-01T00:00:00Z, end=2021-03-02T00:00:00Z)",
+ "1, window(start=2021-03-01T00:00:00Z, end=2021-03-02T00:00:00Z)",
+ "1, window(start=2021-03-01T00:00:00Z, end=2021-03-02T00:00:00Z)",
+ "1, window(start=2021-03-01T00:00:00Z, end=2021-03-02T00:00:00Z)",
+ "1, window(start=2021-03-01T00:00:00Z, end=2021-03-02T00:00:00Z)",
+ "1, window(start=2021-03-01T00:00:00Z, end=2021-03-02T00:00:00Z)",
+ "0, window(start=2021-03-01T00:00:00Z, end=2021-03-02T00:00:00Z)"]
+
+ output = get_file_output()
+
+ if all(elem in output for elem in answers) and all(elem in answers for elem
in output):
+ passed()
+ else:
+ failed("Try using an early trigger with the AfterWatermark trigger.")
if __name__ == '__main__':
- test_is_not_empty()
- test_output()
+ test_is_not_empty()
+ test_output()
diff --git a/learning/katas/python/Triggers/Event Time Triggers/Event Time
Triggers/task-info.yaml b/learning/katas/python/Triggers/Event Time
Triggers/Event Time Triggers/task-info.yaml
index 83ea497..51c7223 100644
--- a/learning/katas/python/Triggers/Event Time Triggers/Event Time
Triggers/task-info.yaml
+++ b/learning/katas/python/Triggers/Event Time Triggers/Event Time
Triggers/task-info.yaml
@@ -22,8 +22,8 @@ files:
- name: task.py
visible: true
placeholders:
- - offset: 1288
- length: 343
+ - offset: 1355
+ length: 392
placeholder_text: TODO()
- name: generate_event.py
visible: true
diff --git a/learning/katas/python/Triggers/Event Time Triggers/Event Time
Triggers/task.py b/learning/katas/python/Triggers/Event Time Triggers/Event
Time Triggers/task.py
index e5d5a60..a1ca61c 100644
--- a/learning/katas/python/Triggers/Event Time Triggers/Event Time
Triggers/task.py
+++ b/learning/katas/python/Triggers/Event Time Triggers/Event Time
Triggers/task.py
@@ -17,34 +17,33 @@
# under the License.
#
+# beam-playground:
+# name: EventTimeTriggers
+# description: Task from katas to count events with event time triggers
+# multifile: true
+# context_line: 46
+# categories:
+# - Streaming
+
import apache_beam as beam
from generate_event import GenerateEvent
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.trigger import AccumulationMode
from apache_beam.transforms.trigger import AfterWatermark
from apache_beam.utils.timestamp import Duration
-from apache_beam.options.pipeline_options import PipelineOptions
-from apache_beam.options.pipeline_options import StandardOptions
from log_elements import LogElements
-def apply_transform(events):
- return (events
- | beam.WindowInto(FixedWindows(5),
- trigger=AfterWatermark(),
- accumulation_mode=AccumulationMode.DISCARDING,
- allowed_lateness=Duration(seconds=0))
- |
beam.CombineGlobally(beam.combiners.CountCombineFn()).without_defaults())
-
-
-def main():
- options = PipelineOptions()
- options.view_as(StandardOptions).streaming = True
- with beam.Pipeline(options=options) as p:
- events = p | GenerateEvent.sample_data()
- output = apply_transform(events)
- output | LogElements(with_window=True)
-
+class CountEvents(beam.PTransform):
+ def expand(self, events):
+ return (events
+ | beam.WindowInto(FixedWindows(5),
+ trigger=AfterWatermark(),
+ accumulation_mode=AccumulationMode.DISCARDING,
+ allowed_lateness=Duration(seconds=0))
+ |
beam.CombineGlobally(beam.combiners.CountCombineFn()).without_defaults())
-if __name__ == "__main__":
- main()
+with beam.Pipeline() as p:
+ (p | GenerateEvent.sample_data()
+ | CountEvents()
+ | LogElements(with_window=True))
diff --git a/learning/katas/python/Triggers/Event Time Triggers/Event Time
Triggers/tests.py b/learning/katas/python/Triggers/Event Time Triggers/Event
Time Triggers/tests.py
index 9bc77bc..6af85b2 100644
--- a/learning/katas/python/Triggers/Event Time Triggers/Event Time
Triggers/tests.py
+++ b/learning/katas/python/Triggers/Event Time Triggers/Event Time
Triggers/tests.py
@@ -14,72 +14,24 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import pytz
-from datetime import datetime
-from task import apply_transform
-from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.testing.test_stream import TestStream
-from apache_beam.transforms import window
-from apache_beam.options.pipeline_options import PipelineOptions
-from apache_beam.options.pipeline_options import StandardOptions
-from apache_beam.testing.util import assert_that, equal_to, equal_to_per_window
from test_helper import failed, passed, get_file_output, test_is_not_empty
def test_output():
- options = PipelineOptions()
- options.view_as(StandardOptions).streaming = True
- test_pipeline = TestPipeline(options=options)
-
- events = ( test_pipeline
- | TestStream()
- .add_elements(elements=["event"],
event_timestamp=datetime(2021, 3, 1, 0, 0, 1, 0, tzinfo=pytz.UTC).timestamp())
- .add_elements(elements=["event"],
event_timestamp=datetime(2021, 3, 1, 0, 0, 2, 0, tzinfo=pytz.UTC).timestamp())
- .add_elements(elements=["event"],
event_timestamp=datetime(2021, 3, 1, 0, 0, 3, 0, tzinfo=pytz.UTC).timestamp())
- .add_elements(elements=["event"],
event_timestamp=datetime(2021, 3, 1, 0, 0, 4, 0, tzinfo=pytz.UTC).timestamp())
- .advance_watermark_to(datetime(2021, 3, 1, 0, 0, 5, 0,
tzinfo=pytz.UTC).timestamp())
- .add_elements(elements=["event"],
event_timestamp=datetime(2021, 3, 1, 0, 0, 5, 0, tzinfo=pytz.UTC).timestamp())
- .add_elements(elements=["event"],
event_timestamp=datetime(2021, 3, 1, 0, 0, 6, 0, tzinfo=pytz.UTC).timestamp())
- .add_elements(elements=["event"],
event_timestamp=datetime(2021, 3, 1, 0, 0, 7, 0, tzinfo=pytz.UTC).timestamp())
- .add_elements(elements=["event"],
event_timestamp=datetime(2021, 3, 1, 0, 0, 8, 0, tzinfo=pytz.UTC).timestamp())
- .add_elements(elements=["event"],
event_timestamp=datetime(2021, 3, 1, 0, 0, 9, 0, tzinfo=pytz.UTC).timestamp())
- .advance_watermark_to(datetime(2021, 3, 1, 0, 0, 10, 0,
tzinfo=pytz.UTC).timestamp())
- .add_elements(elements=["event"],
event_timestamp=datetime(2021, 3, 1, 0, 0, 10, 0, tzinfo=pytz.UTC).timestamp())
- .add_elements(elements=["event"],
event_timestamp=datetime(2021, 3, 1, 0, 0, 11, 0, tzinfo=pytz.UTC).timestamp())
- .add_elements(elements=["event"],
event_timestamp=datetime(2021, 3, 1, 0, 0, 12, 0, tzinfo=pytz.UTC).timestamp())
- .add_elements(elements=["event"],
event_timestamp=datetime(2021, 3, 1, 0, 0, 13, 0, tzinfo=pytz.UTC).timestamp())
- .add_elements(elements=["event"],
event_timestamp=datetime(2021, 3, 1, 0, 0, 14, 0, tzinfo=pytz.UTC).timestamp())
- .advance_watermark_to(datetime(2021, 3, 1, 0, 0, 15, 0,
tzinfo=pytz.UTC).timestamp())
- .add_elements(elements=["event"],
event_timestamp=datetime(2021, 3, 1, 0, 0, 15, 0, tzinfo=pytz.UTC).timestamp())
- .add_elements(elements=["event"],
event_timestamp=datetime(2021, 3, 1, 0, 0, 16, 0, tzinfo=pytz.UTC).timestamp())
- .add_elements(elements=["event"],
event_timestamp=datetime(2021, 3, 1, 0, 0, 17, 0, tzinfo=pytz.UTC).timestamp())
- .add_elements(elements=["event"],
event_timestamp=datetime(2021, 3, 1, 0, 0, 18, 0, tzinfo=pytz.UTC).timestamp())
- .add_elements(elements=["event"],
event_timestamp=datetime(2021, 3, 1, 0, 0, 19, 0, tzinfo=pytz.UTC).timestamp())
- .advance_watermark_to(datetime(2021, 3, 1, 0, 0, 20, 0,
tzinfo=pytz.UTC).timestamp())
- .add_elements(elements=["event"],
event_timestamp=datetime(2021, 3, 1, 0, 0, 20, 0, tzinfo=pytz.UTC).timestamp())
- .advance_watermark_to(datetime(2021, 3, 1, 0, 0, 25, 0,
tzinfo=pytz.UTC).timestamp())
- .advance_watermark_to_infinity())
-
- results = apply_transform(events)
-
- answers = {
- window.IntervalWindow(datetime(2021, 3, 1, 0, 0, 0, 0,
tzinfo=pytz.UTC).timestamp(),
- datetime(2021, 3, 1, 0, 0, 5, 0,
tzinfo=pytz.UTC).timestamp()): [4],
- window.IntervalWindow(datetime(2021, 3, 1, 0, 0, 5, 0,
tzinfo=pytz.UTC).timestamp(),
- datetime(2021, 3, 1, 0, 0, 10, 0,
tzinfo=pytz.UTC).timestamp()): [5],
- window.IntervalWindow(datetime(2021, 3, 1, 0, 0, 10, 0,
tzinfo=pytz.UTC).timestamp(),
- datetime(2021, 3, 1, 0, 0, 15, 0,
tzinfo=pytz.UTC).timestamp()): [5],
- window.IntervalWindow(datetime(2021, 3, 1, 0, 0, 15, 0,
tzinfo=pytz.UTC).timestamp(),
- datetime(2021, 3, 1, 0, 0, 20, 0,
tzinfo=pytz.UTC).timestamp()): [5],
- window.IntervalWindow(datetime(2021, 3, 1, 0, 0, 20, 0,
tzinfo=pytz.UTC).timestamp(),
- datetime(2021, 3, 1, 0, 0, 25, 0,
tzinfo=pytz.UTC).timestamp()): [1],
- }
-
- assert_that(results,
- equal_to_per_window(answers),
- label='count assert per window')
-
- test_pipeline.run()
+ output = get_file_output()
+
+ answers = [
+ "4, window(start=2021-03-01T00:00:00Z, end=2021-03-01T00:00:05Z)",
+ "5, window(start=2021-03-01T00:00:05Z, end=2021-03-01T00:00:10Z)",
+ "5, window(start=2021-03-01T00:00:10Z, end=2021-03-01T00:00:15Z)",
+ "5, window(start=2021-03-01T00:00:15Z, end=2021-03-01T00:00:20Z)",
+ "1, window(start=2021-03-01T00:00:20Z, end=2021-03-01T00:00:25Z)"
+ ]
+
+ if all(line in output for line in answers) and all(line in answers for
line in output):
+ passed()
+ else:
+ failed("Incorrect output. Count the number of events in each 5 seconds
window.")
if __name__ == '__main__':
diff --git a/learning/katas/python/Triggers/Window Accumulation Mode/Window
Accumulation Mode/task-info.yaml b/learning/katas/python/Triggers/Window
Accumulation Mode/Window Accumulation Mode/task-info.yaml
index 56c6adf..193fec1 100644
--- a/learning/katas/python/Triggers/Window Accumulation Mode/Window
Accumulation Mode/task-info.yaml
+++ b/learning/katas/python/Triggers/Window Accumulation Mode/Window
Accumulation Mode/task-info.yaml
@@ -22,9 +22,9 @@ files:
- name: task.py
visible: true
placeholders:
- - offset: 1342
- length: 389
- placeholder_text: TODO()
+ - offset: 1571
+ length: 432
+ placeholder_text: TODO()
- name: tests.py
visible: false
- name: generate_event.py
diff --git a/learning/katas/python/Triggers/Window Accumulation Mode/Window
Accumulation Mode/task.py b/learning/katas/python/Triggers/Window Accumulation
Mode/Window Accumulation Mode/task.py
index 16048f3..cdd2d15 100644
--- a/learning/katas/python/Triggers/Window Accumulation Mode/Window
Accumulation Mode/task.py
+++ b/learning/katas/python/Triggers/Window Accumulation Mode/Window
Accumulation Mode/task.py
@@ -15,7 +15,14 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-#
+
+# beam-playground:
+# name: WindowAccumulationMode
+# description: Task from katas to count events using ACCUMULATING as
accumulation mode
+# multifile: true
+# context_line: 51
+# categories:
+# - Streaming
import apache_beam as beam
from generate_event import GenerateEvent
@@ -29,23 +36,19 @@ from apache_beam.options.pipeline_options import
StandardOptions
from log_elements import LogElements
-def apply_transform(events):
+class CountEventsWithAccumulating(beam.PTransform):
+ def expand(self, events):
return (events
- | beam.WindowInto(FixedWindows(1*24*60*60), # 1 Day Window
+ | beam.WindowInto(FixedWindows(1 * 24 * 60 * 60), # 1 Day Window
trigger=AfterWatermark(early=AfterCount(1)),
accumulation_mode=AccumulationMode.ACCUMULATING,
allowed_lateness=Duration(seconds=0))
|
beam.CombineGlobally(beam.combiners.CountCombineFn()).without_defaults())
-def main():
- options = PipelineOptions()
- options.view_as(StandardOptions).streaming = True
- with beam.Pipeline(options=options) as p:
- events = p | GenerateEvent.sample_data()
- output = apply_transform(events)
- output | LogElements(with_window=True)
-
-
-if __name__ == "__main__":
- main()
+options = PipelineOptions()
+options.view_as(StandardOptions).streaming = True
+with beam.Pipeline(options=options) as p:
+ (p | GenerateEvent.sample_data()
+ | CountEventsWithAccumulating()
+ | LogElements(with_window=True))
diff --git a/learning/katas/python/Triggers/Window Accumulation Mode/Window
Accumulation Mode/tests.py b/learning/katas/python/Triggers/Window Accumulation
Mode/Window Accumulation Mode/tests.py
index 80fd21b..aa3bd3b 100644
--- a/learning/katas/python/Triggers/Window Accumulation Mode/Window
Accumulation Mode/tests.py
+++ b/learning/katas/python/Triggers/Window Accumulation Mode/Window
Accumulation Mode/tests.py
@@ -14,69 +14,39 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import pytz
-from datetime import datetime
-from task import apply_transform
-from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.testing.test_stream import TestStream
-from apache_beam.transforms import window
-from apache_beam.options.pipeline_options import PipelineOptions
-from apache_beam.options.pipeline_options import StandardOptions
-from apache_beam.testing.util import assert_that, equal_to, equal_to_per_window
from test_helper import failed, passed, get_file_output, test_is_not_empty
def test_output():
- options = PipelineOptions()
- options.view_as(StandardOptions).streaming = True
- test_pipeline = TestPipeline(options=options)
-
- events = ( test_pipeline
- | TestStream()
- .add_elements(elements=["event"],
event_timestamp=datetime(2021, 3, 1, 0, 0, 1, 0, tzinfo=pytz.UTC).timestamp())
- .add_elements(elements=["event"],
event_timestamp=datetime(2021, 3, 1, 0, 0, 2, 0, tzinfo=pytz.UTC).timestamp())
- .add_elements(elements=["event"],
event_timestamp=datetime(2021, 3, 1, 0, 0, 3, 0, tzinfo=pytz.UTC).timestamp())
- .add_elements(elements=["event"],
event_timestamp=datetime(2021, 3, 1, 0, 0, 4, 0, tzinfo=pytz.UTC).timestamp())
- .advance_watermark_to(datetime(2021, 3, 1, 0, 0, 5, 0,
tzinfo=pytz.UTC).timestamp())
- .add_elements(elements=["event"],
event_timestamp=datetime(2021, 3, 1, 0, 0, 5, 0, tzinfo=pytz.UTC).timestamp())
- .add_elements(elements=["event"],
event_timestamp=datetime(2021, 3, 1, 0, 0, 6, 0, tzinfo=pytz.UTC).timestamp())
- .add_elements(elements=["event"],
event_timestamp=datetime(2021, 3, 1, 0, 0, 7, 0, tzinfo=pytz.UTC).timestamp())
- .add_elements(elements=["event"],
event_timestamp=datetime(2021, 3, 1, 0, 0, 8, 0, tzinfo=pytz.UTC).timestamp())
- .add_elements(elements=["event"],
event_timestamp=datetime(2021, 3, 1, 0, 0, 9, 0, tzinfo=pytz.UTC).timestamp())
- .advance_watermark_to(datetime(2021, 3, 1, 0, 0, 10, 0,
tzinfo=pytz.UTC).timestamp())
- .add_elements(elements=["event"],
event_timestamp=datetime(2021, 3, 1, 0, 0, 10, 0, tzinfo=pytz.UTC).timestamp())
- .add_elements(elements=["event"],
event_timestamp=datetime(2021, 3, 1, 0, 0, 11, 0, tzinfo=pytz.UTC).timestamp())
- .add_elements(elements=["event"],
event_timestamp=datetime(2021, 3, 1, 0, 0, 12, 0, tzinfo=pytz.UTC).timestamp())
- .add_elements(elements=["event"],
event_timestamp=datetime(2021, 3, 1, 0, 0, 13, 0, tzinfo=pytz.UTC).timestamp())
- .add_elements(elements=["event"],
event_timestamp=datetime(2021, 3, 1, 0, 0, 14, 0, tzinfo=pytz.UTC).timestamp())
- .advance_watermark_to(datetime(2021, 3, 1, 0, 0, 15, 0,
tzinfo=pytz.UTC).timestamp())
- .add_elements(elements=["event"],
event_timestamp=datetime(2021, 3, 1, 0, 0, 15, 0, tzinfo=pytz.UTC).timestamp())
- .add_elements(elements=["event"],
event_timestamp=datetime(2021, 3, 1, 0, 0, 16, 0, tzinfo=pytz.UTC).timestamp())
- .add_elements(elements=["event"],
event_timestamp=datetime(2021, 3, 1, 0, 0, 17, 0, tzinfo=pytz.UTC).timestamp())
- .add_elements(elements=["event"],
event_timestamp=datetime(2021, 3, 1, 0, 0, 18, 0, tzinfo=pytz.UTC).timestamp())
- .add_elements(elements=["event"],
event_timestamp=datetime(2021, 3, 1, 0, 0, 19, 0, tzinfo=pytz.UTC).timestamp())
- .advance_watermark_to(datetime(2021, 3, 1, 0, 0, 20, 0,
tzinfo=pytz.UTC).timestamp())
- .add_elements(elements=["event"],
event_timestamp=datetime(2021, 3, 1, 0, 0, 20, 0, tzinfo=pytz.UTC).timestamp())
- .advance_watermark_to(datetime(2021, 3, 1, 0, 0, 25, 0,
tzinfo=pytz.UTC).timestamp())
- .advance_watermark_to_infinity())
-
- results = apply_transform(events)
-
- answers = {
- window.IntervalWindow(datetime(2021, 3, 1, 0, 0, 0, 0,
tzinfo=pytz.UTC).timestamp(),
- datetime(2021, 3, 2, 0, 0, 0, 0,
tzinfo=pytz.UTC).timestamp()): [1, 2, 3, 4, 5,
-
6, 7, 8, 9, 10,
-
11, 12, 13, 14, 15,
-
16, 17, 18, 19, 20, 20],
- }
-
- assert_that(results,
- equal_to_per_window(answers),
- label='count assert per window')
-
- test_pipeline.run()
+ answers = ["1, window(start=2021-03-01T00:00:00Z, end=2021-03-02T00:00:00Z)",
+ "2, window(start=2021-03-01T00:00:00Z, end=2021-03-02T00:00:00Z)",
+ "3, window(start=2021-03-01T00:00:00Z, end=2021-03-02T00:00:00Z)",
+ "4, window(start=2021-03-01T00:00:00Z, end=2021-03-02T00:00:00Z)",
+ "5, window(start=2021-03-01T00:00:00Z, end=2021-03-02T00:00:00Z)",
+ "6, window(start=2021-03-01T00:00:00Z, end=2021-03-02T00:00:00Z)",
+ "7, window(start=2021-03-01T00:00:00Z, end=2021-03-02T00:00:00Z)",
+ "8, window(start=2021-03-01T00:00:00Z, end=2021-03-02T00:00:00Z)",
+ "9, window(start=2021-03-01T00:00:00Z, end=2021-03-02T00:00:00Z)",
+ "10, window(start=2021-03-01T00:00:00Z,
end=2021-03-02T00:00:00Z)",
+ "11, window(start=2021-03-01T00:00:00Z,
end=2021-03-02T00:00:00Z)",
+ "12, window(start=2021-03-01T00:00:00Z,
end=2021-03-02T00:00:00Z)",
+ "13, window(start=2021-03-01T00:00:00Z,
end=2021-03-02T00:00:00Z)",
+ "14, window(start=2021-03-01T00:00:00Z,
end=2021-03-02T00:00:00Z)",
+ "15, window(start=2021-03-01T00:00:00Z,
end=2021-03-02T00:00:00Z)",
+ "16, window(start=2021-03-01T00:00:00Z,
end=2021-03-02T00:00:00Z)",
+ "17, window(start=2021-03-01T00:00:00Z,
end=2021-03-02T00:00:00Z)",
+ "18, window(start=2021-03-01T00:00:00Z,
end=2021-03-02T00:00:00Z)",
+ "19, window(start=2021-03-01T00:00:00Z,
end=2021-03-02T00:00:00Z)",
+ "20, window(start=2021-03-01T00:00:00Z,
end=2021-03-02T00:00:00Z)"]
+
+ output = get_file_output()
+
+ if all(elem in output for elem in answers) and all(elem in answers for elem
in output):
+ passed()
+ else:
+ failed("Try using a count early trigger, with accumulating mode.")
if __name__ == '__main__':
- test_is_not_empty()
- test_output()
+ test_is_not_empty()
+ test_output()
diff --git a/learning/katas/python/Windowing/Adding
Timestamp/ParDo/task-info.yaml b/learning/katas/python/Windowing/Adding
Timestamp/ParDo/task-info.yaml
index 5c97141..7f4dabc 100644
--- a/learning/katas/python/Windowing/Adding Timestamp/ParDo/task-info.yaml
+++ b/learning/katas/python/Windowing/Adding Timestamp/ParDo/task-info.yaml
@@ -22,11 +22,11 @@ files:
- name: task.py
visible: true
placeholders:
- - offset: 1231
- length: 155
- placeholder_text: TODO()
- - offset: 1940
+ - offset: 2165
length: 30
placeholder_text: TODO()
+ - offset: 1451
+ length: 160
+ placeholder_text: TODO()
- name: tests.py
visible: false
diff --git a/learning/katas/python/Windowing/Fixed Time Window/Fixed Time
Window/task-info.yaml b/learning/katas/python/Windowing/Fixed Time Window/Fixed
Time Window/task-info.yaml
index a1462bb..27fb1ed 100644
--- a/learning/katas/python/Windowing/Fixed Time Window/Fixed Time
Window/task-info.yaml
+++ b/learning/katas/python/Windowing/Fixed Time Window/Fixed Time
Window/task-info.yaml
@@ -22,7 +22,7 @@ files:
- name: task.py
visible: true
placeholders:
- - offset: 2100
+ - offset: 2363
length: 87
placeholder_text: TODO()
- name: tests.py
diff --git a/learning/katas/python/requirements.txt
b/learning/katas/python/requirements.txt
index d3b069b..9b259a4 100644
--- a/learning/katas/python/requirements.txt
+++ b/learning/katas/python/requirements.txt
@@ -14,7 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-apache-beam==2.19.0
-apache-beam[test]==2.19.0
+apache-beam==2.36.0
+apache-beam[test]==2.36.0
-pytz~=2019.3
\ No newline at end of file
+pytz~=2021.3
\ No newline at end of file