[
https://issues.apache.org/jira/browse/BEAM-7926?focusedWorklogId=353789&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-353789
]
ASF GitHub Bot logged work on BEAM-7926:
----------------------------------------
Author: ASF GitHub Bot
Created on: 04/Dec/19 21:21
Start Date: 04/Dec/19 21:21
Worklog Time Spent: 10m
Work Description: KevinGG commented on pull request #10276: [BEAM-7926]
Data-centric Interactive Part1
URL: https://github.com/apache/beam/pull/10276#discussion_r353988904
##########
File path:
sdks/python/apache_beam/runners/interactive/testing/pipeline_assertion.py
##########
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+def assert_pipeline_equal(test_case, expected_pipeline, actual_pipeline):
+ """Asserts the equivalence between two given apache_beam.Pipeline instances.
+
+ Args:
+ test_case: (unittest.TestCase) the unittest testcase where it asserts.
+ expected_pipeline: (Pipeline) the pipeline instance expected.
+ actual_pipeline: (Pipeline) the actual pipeline instance to be asserted.
+ """
+ actual_pipeline_proto = actual_pipeline.to_runner_api(use_fake_coders=True)
+ expected_pipeline_proto = expected_pipeline.to_runner_api(
+ use_fake_coders=True)
+ components1 = actual_pipeline_proto.components
+ components2 = expected_pipeline_proto.components
+ test_case.assertEqual(len(components1.transforms),
+ len(components2.transforms))
+ test_case.assertEqual(len(components1.pcollections),
+ len(components2.pcollections))
+
+ test_case.assertEqual(len(components1.windowing_strategies),
+ len(components2.windowing_strategies))
+ test_case.assertEqual(len(components1.coders), len(components2.coders))
+ _assert_transform_equal(test_case,
+ actual_pipeline_proto,
+ actual_pipeline_proto.root_transform_ids[0],
+ expected_pipeline_proto,
+ expected_pipeline_proto.root_transform_ids[0])
+
+
+def _assert_transform_equal(test_case, actual_pipeline_proto,
+ actual_transform_id,
+ expected_pipeline_proto, expected_transform_id):
+ """Asserts the equivalence between transforms from two given pipelines. """
+ transform_proto1 = actual_pipeline_proto.components.transforms[
+ actual_transform_id]
+ transform_proto2 = expected_pipeline_proto.components.transforms[
+ expected_transform_id]
+ test_case.assertEqual(transform_proto1.spec.urn, transform_proto2.spec.urn)
+ # Skipping payload checking because PTransforms of the same functionality
Review comment:
Probably if they have the same label.
This functionality was originally part of the `pipeline_analyzer_test`. And
the module is used to verify if the implicit cache transforms are applied as
expected. So there would not be user code involved in any test that uses this
assertion.
I'll add a module level docstring to clarify this utility's usage:
```
"""Module to verify implicit cache transforms applied by Interactive Beam.
For internal use only; no backwards-compatibility guarantees.
This utility should only be used by Interactive Beam tests. It verifies if
the
implicit cache transforms are applied as expected when running a pipeline
with
the InteractiveRunner. It shouldn't be used to verify equivalence between
pipelines if the code to be tested depends on or mutates user code in
pipelines.
"""
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 353789)
Time Spent: 24h 50m (was: 24h 40m)
> Show PCollection with Interactive Beam in a data-centric user flow
> ------------------------------------------------------------------
>
> Key: BEAM-7926
> URL: https://issues.apache.org/jira/browse/BEAM-7926
> Project: Beam
> Issue Type: New Feature
> Components: runner-py-interactive
> Reporter: Ning Kang
> Assignee: Ning Kang
> Priority: Major
> Time Spent: 24h 50m
> Remaining Estimate: 0h
>
> Support auto plotting / charting of materialized data of a given PCollection
> with Interactive Beam.
> Say an Interactive Beam pipeline defined as
>
> {code:java}
> p = beam.Pipeline(InteractiveRunner())
> pcoll = p | 'Transform' >> transform()
> pcoll2 = ...
> pcoll3 = ...{code}
> The use can call a single function and get auto-magical charting of the data.
> e.g.,
> {code:java}
> show(pcoll, pcoll2)
> {code}
> Throughout the process, a pipeline fragment is built to include only
> transforms necessary to produce the desired pcolls (pcoll and pcoll2) and
> execute that fragment.
> This makes the Interactive Beam user flow data-centric.
>
> Detailed
> [design|https://docs.google.com/document/d/1DYWrT6GL_qDCXhRMoxpjinlVAfHeVilK5Mtf8gO6zxQ/edit#heading=h.v6k2o3roarzz].
--
This message was sent by Atlassian Jira
(v8.3.4#803005)