> Is it one of the best guarded secrets? ;-) Apparently so! Filed a few related jiras and assigned to myself. [1] https://issues.apache.org/jira/browse/BEAM-8214 [2] https://issues.apache.org/jira/browse/BEAM-8232 [3] https://issues.apache.org/jira/browse/BEAM-8233
Kyle Weaver | Software Engineer | github.com/ibzib | [email protected] On Fri, Sep 13, 2019 at 9:57 AM Robert Bradshaw <[email protected]> wrote: > Note that loopback won't fix the problem for, say, cross-language IOs. > But, yes, it's really handy and should probably be used more. > > On Fri, Sep 13, 2019 at 8:29 AM Lukasz Cwik <[email protected]> wrote: > >> And/or update the wiki/website with some how to's... >> >> On Fri, Sep 13, 2019 at 7:51 AM Thomas Weise <[email protected]> wrote: >> >>> I agree that loopback would be preferable for this purpose. I just >>> wasn't aware this even works with the portable Flink runner. Is it one of >>> the best guarded secrets? ;-) >>> >>> Kyle, can you please post the pipeline options you would use for Flink? >>> >>> >>> On Thu, Sep 12, 2019 at 5:57 PM Kyle Weaver <[email protected]> wrote: >>> >>>> I prefer loopback because a) it writes output files to the local >>>> filesystem, as the user expects, and b) you don't have to pull or build >>>> docker images, or even have docker installed on your system -- which is one >>>> less point of failure. >>>> >>>> Kyle Weaver | Software Engineer | github.com/ibzib | >>>> [email protected] >>>> >>>> >>>> On Thu, Sep 12, 2019 at 5:48 PM Thomas Weise <[email protected]> wrote: >>>> >>>>> This should become much better with 2.16 when we have the Docker >>>>> images prebuilt. >>>>> >>>>> Docker is probably still the best option for Python on a JVM based >>>>> runner in a local environment that does not have a development setup. >>>>> >>>>> >>>>> On Thu, Sep 12, 2019 at 1:09 PM Kyle Weaver <[email protected]> >>>>> wrote: >>>>> >>>>>> +dev <[email protected]> I think we should probably point new >>>>>> users of the portable Flink/Spark runners to use loopback or some other >>>>>> non-docker environment, as Docker adds some operational complexity that >>>>>> isn't really needed to run a word count example. For example, Yu's >>>>>> pipeline >>>>>> errored here because the expected Docker container wasn't built before >>>>>> running. >>>>>> >>>>>> Kyle Weaver | Software Engineer | github.com/ibzib | >>>>>> [email protected] >>>>>> >>>>>> >>>>>> On Thu, Sep 12, 2019 at 11:27 AM Robert Bradshaw <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> On this note, making local files easy to read is something we'd >>>>>>> definitely like to improve, as the current behavior is quite surprising. >>>>>>> This could be useful not just for running with docker and the portable >>>>>>> runner locally, but more generally when running on a distributed system >>>>>>> (e.g. a Flink/Spark cluster or Dataflow). It would be very convenient >>>>>>> if we >>>>>>> could automatically stage local files to be read as artifacts that >>>>>>> could be >>>>>>> consumed by any worker (possibly via external directory mounting in the >>>>>>> local docker case rather than an actual copy), and conversely copy small >>>>>>> outputs back to the local machine (with the similar optimization for >>>>>>> local >>>>>>> docker). >>>>>>> >>>>>>> At the very least, however, obvious messaging when the local >>>>>>> filesystem is used from within docker, which is often a (non-obvious and >>>>>>> hard to debug) mistake should be added. >>>>>>> >>>>>>> >>>>>>> On Thu, Sep 12, 2019 at 10:34 AM Lukasz Cwik <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> When you use a local filesystem path and a docker environment, >>>>>>>> "/tmp" is written inside the container. You can solve this issue by: >>>>>>>> * Using a "remote" filesystem such as HDFS/S3/GCS/... >>>>>>>> * Mounting an external directory into the container so that any >>>>>>>> "local" writes appear outside the container >>>>>>>> * Using a non-docker environment such as external or process. >>>>>>>> >>>>>>>> On Thu, Sep 12, 2019 at 4:51 AM Yu Watanabe <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hello. >>>>>>>>> >>>>>>>>> I would like to ask for help with my sample code using portable >>>>>>>>> runner using apache flink. >>>>>>>>> I was able to work out the wordcount.py using this page. >>>>>>>>> >>>>>>>>> https://beam.apache.org/roadmap/portability/ >>>>>>>>> >>>>>>>>> I got below two files under /tmp. >>>>>>>>> >>>>>>>>> -rw-r--r-- 1 ywatanabe ywatanabe 185 Sep 12 19:56 >>>>>>>>> py-wordcount-direct-00001-of-00002 >>>>>>>>> -rw-r--r-- 1 ywatanabe ywatanabe 190 Sep 12 19:56 >>>>>>>>> py-wordcount-direct-00000-of-00002 >>>>>>>>> >>>>>>>>> Then I wrote sample code with below steps. >>>>>>>>> >>>>>>>>> 1.Install apache_beam using pip3 separate from source code >>>>>>>>> directory. >>>>>>>>> 2. Wrote sample code as below and named it >>>>>>>>> "test-protable-runner.py". Placed it separate directory from source >>>>>>>>> code. >>>>>>>>> >>>>>>>>> ----------------------------------------------------------------------------------- >>>>>>>>> (python) ywatanabe@debian-09-00:~$ ls -ltr >>>>>>>>> total 16 >>>>>>>>> drwxr-xr-x 18 ywatanabe ywatanabe 4096 Sep 12 19:06 beam (<- >>>>>>>>> source code directory) >>>>>>>>> -rw-r--r-- 1 ywatanabe ywatanabe 634 Sep 12 20:25 >>>>>>>>> test-portable-runner.py >>>>>>>>> >>>>>>>>> ----------------------------------------------------------------------------------- >>>>>>>>> 3. Executed the code with "python3 test-protable-ruuner.py" >>>>>>>>> >>>>>>>>> >>>>>>>>> ========================================================================================== >>>>>>>>> #!/usr/bin/env >>>>>>>>> >>>>>>>>> import apache_beam as beam >>>>>>>>> from apache_beam.options.pipeline_options import PipelineOptions >>>>>>>>> from apache_beam.io import WriteToText >>>>>>>>> >>>>>>>>> >>>>>>>>> def printMsg(line): >>>>>>>>> >>>>>>>>> print("OUTPUT: {0}".format(line)) >>>>>>>>> >>>>>>>>> return line >>>>>>>>> >>>>>>>>> options = PipelineOptions(["--runner=PortableRunner", >>>>>>>>> "--job_endpoint=localhost:8099", >>>>>>>>> "--shutdown_sources_on_final_watermark"]) >>>>>>>>> >>>>>>>>> p = beam.Pipeline(options=options) >>>>>>>>> >>>>>>>>> output = ( p | 'create' >> beam.Create(["a", "b", "c"]) >>>>>>>>> | beam.Map(printMsg) >>>>>>>>> ) >>>>>>>>> >>>>>>>>> output | 'write' >> WriteToText('/tmp/sample.txt') >>>>>>>>> >>>>>>>>> ======================================================================================= >>>>>>>>> >>>>>>>>> Job seemed to went all the way to "FINISHED" state. >>>>>>>>> >>>>>>>>> ------------------------------------------------------------------------------------------------------------------------------------------------------- >>>>>>>>> [DataSource (Impulse) (1/1)] INFO >>>>>>>>> org.apache.flink.runtime.taskmanager.Task - Registering task at >>>>>>>>> network: >>>>>>>>> DataSource (Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) >>>>>>>>> [DEPLOYING]. >>>>>>>>> [DataSource (Impulse) (1/1)] INFO >>>>>>>>> org.apache.flink.runtime.taskmanager.Task - DataSource (Impulse) (1/1) >>>>>>>>> (9df528f4d493d8c3b62c8be23dc889a8) switched from DEPLOYING to RUNNING. >>>>>>>>> [flink-akka.actor.default-dispatcher-3] INFO >>>>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - DataSource >>>>>>>>> (Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) switched from >>>>>>>>> DEPLOYING >>>>>>>>> to RUNNING. >>>>>>>>> [flink-akka.actor.default-dispatcher-3] INFO >>>>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN >>>>>>>>> MapPartition >>>>>>>>> (MapPartition at [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at >>>>>>>>> core.py:2415>), Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) >>>>>>>>> (1/1) (0b60f1a9e620b061f85e97998aa4b181) switched from CREATED to >>>>>>>>> SCHEDULED. >>>>>>>>> [flink-akka.actor.default-dispatcher-3] INFO >>>>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN >>>>>>>>> MapPartition >>>>>>>>> (MapPartition at [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at >>>>>>>>> core.py:2415>), Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) >>>>>>>>> (1/1) (0b60f1a9e620b061f85e97998aa4b181) switched from SCHEDULED to >>>>>>>>> DEPLOYING. >>>>>>>>> [flink-akka.actor.default-dispatcher-3] INFO >>>>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying >>>>>>>>> CHAIN >>>>>>>>> MapPartition (MapPartition at >>>>>>>>> [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at core.py:2415>), >>>>>>>>> Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) (1/1) (attempt >>>>>>>>> #0) >>>>>>>>> to aad5f142-dbb7-4900-a5ae-af8a6fdcc787 @ localhost (dataPort=-1) >>>>>>>>> [flink-akka.actor.default-dispatcher-2] INFO >>>>>>>>> org.apache.flink.runtime.taskexecutor.TaskExecutor - Received task >>>>>>>>> CHAIN >>>>>>>>> MapPartition (MapPartition at >>>>>>>>> [2]write/Write/WriteImpl/DoOnce/{FlatMap(<lambda at core.py:2415>), >>>>>>>>> Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) (1/1). >>>>>>>>> [DataSource (Impulse) (1/1)] INFO >>>>>>>>> org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task >>>>>>>>> DataSource (Impulse) (1/1) (d51e4171c0af881342eaa8a7ab06def8) >>>>>>>>> [DEPLOYING]. >>>>>>>>> [DataSource (Impulse) (1/1)] INFO >>>>>>>>> org.apache.flink.runtime.taskmanager.Task - Registering task at >>>>>>>>> network: >>>>>>>>> DataSource (Impulse) (1/1) (d51e4171c0af881342eaa8a7ab06def8) >>>>>>>>> [DEPLOYING]. >>>>>>>>> [DataSource (Impulse) (1/1)] INFO >>>>>>>>> org.apache.flink.runtime.taskmanager.Task - DataSource (Impulse) (1/1) >>>>>>>>> (9df528f4d493d8c3b62c8be23dc889a8) switched from RUNNING to FINISHED. >>>>>>>>> >>>>>>>>> ------------------------------------------------------------------------------------------------------------------------------------------------------- >>>>>>>>> >>>>>>>>> But I ended up with docker error on client side. >>>>>>>>> >>>>>>>>> ------------------------------------------------------------------------------------------------------------------------------------------------------- >>>>>>>>> (python) ywatanabe@debian-09-00:~$ python3 test-portable-runner.py >>>>>>>>> /home/ywatanabe/python/lib/python3.5/site-packages/apache_beam/__init__.py:84: >>>>>>>>> UserWarning: Some syntactic constructs of Python 3 are not yet fully >>>>>>>>> supported by Apache Beam. >>>>>>>>> 'Some syntactic constructs of Python 3 are not yet fully >>>>>>>>> supported by ' >>>>>>>>> ERROR:root:java.io.IOException: Received exit code 125 for command >>>>>>>>> 'docker run -d --network=host --env=DOCKER_MAC_CONTAINER=null >>>>>>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest --id=13-1 >>>>>>>>> --logging_endpoint=localhost:39049 --artifact_endpoint=localhost:33779 >>>>>>>>> --provision_endpoint=localhost:34827 >>>>>>>>> --control_endpoint=localhost:36079'. >>>>>>>>> stderr: Unable to find image ' >>>>>>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest' >>>>>>>>> locallydocker: Error response from daemon: unknown: Subject ywatanabe >>>>>>>>> was >>>>>>>>> not found.See 'docker run --help'. >>>>>>>>> Traceback (most recent call last): >>>>>>>>> File "test-portable-runner.py", line 27, in <module> >>>>>>>>> result.wait_until_finish() >>>>>>>>> File >>>>>>>>> "/home/ywatanabe/python/lib/python3.5/site-packages/apache_beam/runners/portability/portable_runner.py", >>>>>>>>> line 446, in wait_until_finish >>>>>>>>> self._job_id, self._state, self._last_error_message())) >>>>>>>>> RuntimeError: Pipeline >>>>>>>>> BeamApp-ywatanabe-0912112516-22d84d63_d3b5e778-d771-4118-9ba7-c9dde85938e9 >>>>>>>>> failed in state FAILED: java.io.IOException: Received exit code 125 >>>>>>>>> for >>>>>>>>> command 'docker run -d --network=host --env=DOCKER_MAC_CONTAINER=null >>>>>>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest --id=13-1 >>>>>>>>> --logging_endpoint=localhost:39049 --artifact_endpoint=localhost:33779 >>>>>>>>> --provision_endpoint=localhost:34827 >>>>>>>>> --control_endpoint=localhost:36079'. >>>>>>>>> stderr: Unable to find image ' >>>>>>>>> ywatanabe-docker-apache.bintray.io/beam/python3:latest' >>>>>>>>> locallydocker: Error response from daemon: unknown: Subject ywatanabe >>>>>>>>> was >>>>>>>>> not found.See 'docker run --help'. >>>>>>>>> >>>>>>>>> ------------------------------------------------------------------------------------------------------------------------------------------------------- >>>>>>>>> >>>>>>>>> As a result , I got nothing under /tmp . Code works when using >>>>>>>>> DirectRunner. >>>>>>>>> May I ask , where should I look for in order to get the pipeline >>>>>>>>> to write results to text files under /tmp ? >>>>>>>>> >>>>>>>>> Best Regards, >>>>>>>>> Yu Watanabe >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Yu Watanabe >>>>>>>>> Weekend Freelancer who loves to challenge building data platform >>>>>>>>> [email protected] >>>>>>>>> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1> >>>>>>>>> [image: >>>>>>>>> Twitter icon] <https://twitter.com/yuwtennis> >>>>>>>>> >>>>>>>>
