I downgraded the Flink from 1.7.1 to 1.5.6, and was able to go further, but still fails, here is the latest error from Flink. Thanks!
the job cmd I launched : python -m apache_beam.examples.wordcount --input=/etc/profile --output=/tmp/py-wordcount-direct --runner=PortableRunner --job_endpoint=localhost:8099 --parallelism=1 --OPTIONALflink_master=localhost:8081 --streaming --experiments=worker_threads=100 --execution_mode_for_batch=BATCH_FORCED --experiments=beam_fn_api Jun ---- log starts ---- [flink-runner-job-server] INFO org.apache.beam.runners.flink.FlinkExecutionEnvironments - Running remotely at localhost:8081 [flink-runner-job-server] WARN org.apache.flink.configuration.Configuration - Config uses deprecated configuration key 'jobmanager.rpc.address' instead of proper key 'rest.address' [flink-runner-job-server] INFO org.apache.flink.runtime.rest.RestClient - Rest client endpoint started. [flink-runner-job-server] INFO org.apache.flink.client.program.rest.RestClusterClient - Submitting job 4ecb5e5cfd4718de440f48cbfaf7216a (detached: false). [flink-runner-job-server] INFO org.apache.flink.runtime.rest.RestClient - Shutting down rest endpoint. [flink-runner-job-server] INFO org.apache.flink.runtime.rest.RestClient - Rest endpoint shutdown complete. [flink-runner-job-server] ERROR org.apache.beam.runners.flink.FlinkJobInvocation - Error during job invocation BeamApp-jwan-0121211115-328178bb_d2dadedb-6dbf-4c1e-82d4-208a2d3177e9. org.apache.flink.client.program.ProgramInvocationException: Job 4ecb5e5cfd4718de440f48cbfaf7216a failed. at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464) at org.apache.beam.runners.flink.FlinkExecutionEnvironments$BeamFlinkRemoteStreamEnvironment.executeRemotely(FlinkExecutionEnvironments.java:355) at org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:179) at org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator$StreamingTranslationContext.execute(FlinkStreamingPortablePipelineTranslator.java:158) at org.apache.beam.runners.flink.FlinkJobInvocation.runPipelineWithTranslator(FlinkJobInvocation.java:142) at org.apache.beam.runners.flink.FlinkJobInvocation.runPipeline(FlinkJobInvocation.java:112) at org.apache.beam.repackaged.beam_runners_flink_2.11.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:111) at org.apache.beam.repackaged.beam_runners_flink_2.11.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:58) at org.apache.beam.repackaged.beam_runners_flink_2.11.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:75) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262) ... 12 more Caused by: java.lang.RuntimeException: Exception occurred while processing valve output watermark: at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712) ... 1 more Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:483) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitWatermark(AbstractStreamOperator.java:694) at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.emitWatermark(DoFnOperator.java:591) at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark1(DoFnOperator.java:581) at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark(DoFnOperator.java:540) at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262) ... 7 more Caused by: java.lang.RuntimeException: Failed to finish remote bundle at org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.finishBundle(ExecutableStageDoFnOperator.java:626) at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.finishBundle(DoFnRunnerWithMetricsUpdate.java:87) at org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.finishBundle(SimplePushbackSideInputDoFnRunner.java:118) at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.invokeFinishBundle(DoFnOperator.java:677) at org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator.processWatermark(ExecutableStageDoFnOperator.java:471) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:479) ... 12 more Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction 21: Traceback (most recent call last): File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 148, in _execute response = task() File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 183, in <lambda> self._execute(lambda: worker.do_instruction(work), work) File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 256, in do_instruction request.instruction_id) File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 270, in process_bundle request.process_bundle_descriptor_reference) as bundle_processor: File "/usr/local/lib/python2.7/contextlib.py", line 17, in __enter__ return self.gen.next() File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 292, in get_bundle_processor self.data_channel_factory) File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 404, in __init__ self.ops = self.create_execution_tree(self.process_bundle_descriptor) File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 448, in create_execution_tree descriptor.transforms, key=topological_height, reverse=True)]) File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 381, in wrapper result = cache[args] = func(*args) File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 431, in get_operation in descriptor.transforms[transform_id].outputs.items() File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 430, in <dictcomp> for tag, pcoll_id File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 381, in wrapper result = cache[args] = func(*args) File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 434, in get_operation transform_id, transform_consumers) File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 584, in create_operation return creator(self, transform_id, transform_proto, payload, consumers) File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 768, in create serialized_fn, parameter) File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 806, in _create_pardo_operation dofn_data = pickler.loads(serialized_fn) File "/usr/local/lib/python2.7/site-packages/apache_beam/internal/pickler.py", line 247, in loads return dill.loads(s) File "/usr/local/lib/python2.7/site-packages/dill/_dill.py", line 316, in loads return load(file, ignore) File "/usr/local/lib/python2.7/site-packages/dill/_dill.py", line 304, in load obj = pik.load() File "/usr/local/lib/python2.7/pickle.py", line 864, in load dispatch[key](self) File "/usr/local/lib/python2.7/pickle.py", line 1230, in load_build d = inst.__dict__ AttributeError: 'apache_beam.utils.windowed_value.PaneInfo' object has no attribute '__dict__' at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57) at org.apache.beam.runners.fnexecution.control.SdkHarnessClient$ActiveBundle.close(SdkHarnessClient.java:263) at org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.finishBundle(ExecutableStageDoFnOperator.java:623) ... 17 more Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction 21: Traceback (most recent call last): File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 148, in _execute response = task() File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 183, in <lambda> self._execute(lambda: worker.do_instruction(work), work) File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 256, in do_instruction request.instruction_id) File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 270, in process_bundle request.process_bundle_descriptor_reference) as bundle_processor: File "/usr/local/lib/python2.7/contextlib.py", line 17, in __enter__ return self.gen.next() File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 292, in get_bundle_processor self.data_channel_factory) File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 404, in __init__ self.ops = self.create_execution_tree(self.process_bundle_descriptor) File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 448, in create_execution_tree descriptor.transforms, key=topological_height, reverse=True)]) File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 381, in wrapper result = cache[args] = func(*args) File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 431, in get_operation in descriptor.transforms[transform_id].outputs.items() File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 430, in <dictcomp> for tag, pcoll_id File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 381, in wrapper result = cache[args] = func(*args) File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 434, in get_operation transform_id, transform_consumers) File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 584, in create_operation return creator(self, transform_id, transform_proto, payload, consumers) File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 768, in create serialized_fn, parameter) File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 806, in _create_pardo_operation dofn_data = pickler.loads(serialized_fn) File "/usr/local/lib/python2.7/site-packages/apache_beam/internal/pickler.py", line 247, in loads return dill.loads(s) File "/usr/local/lib/python2.7/site-packages/dill/_dill.py", line 316, in loads return load(file, ignore) File "/usr/local/lib/python2.7/site-packages/dill/_dill.py", line 304, in load obj = pik.load() File "/usr/local/lib/python2.7/pickle.py", line 864, in load dispatch[key](self) File "/usr/local/lib/python2.7/pickle.py", line 1230, in load_build d = inst.__dict__ AttributeError: 'apache_beam.utils.windowed_value.PaneInfo' object has no attribute '__dict__' at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157) at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:140) at org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:248) at org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33) at org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76) at org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:263) at org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:683) at org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) at org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more ---- log ends ---- On 2018/11/14 19:57:19, Ruoyun Huang <ruo...@google.com> wrote: > To answer Maximilian's question. > > I am using Linux, debian distribution. > > It probably sounded too much when I used the word 'planned merge'. What I > really meant entails less change than it sounds. More specifically: > > 1) The default behavior, where PortableRunner starts a flink server. It is > confusing to new users. > 2) All the related docs and inline comments. Similarly, it could be very > confusing connecting PortableRunner to Flink server. > 3) [Probably no longer an issue]. I couldn't make the flink server > example working. And I could not make example working on Java-ULR either. > Both will require debugging for resolutions. Thus I figured maybe let us > only focus on one single thing: the java-ULR part, without worrying about > Flink-server. Again, looks like this may not be a valid concern, given > flink part is most likely due to my setup. > > > On Wed, Nov 14, 2018 at 3:30 AM Maximilian Michels <m...@apache.org> wrote: > > > Hi Ruoyun, > > > > I just ran the wordcount locally using the instructions on the page. > > I've tried the local file system and GCS. Both times it ran successfully > > and produced valid output. > > > > I'm assuming there is some problem with your setup. Which platform are > > you using? I'm on MacOS. > > > > Could you expand on the planned merge? From my understanding we will > > always need PortableRunner in Python to be able to submit against the > > Beam JobServer. > > > > Thanks, > > Max > > > > On 14.11.18 00:39, Ruoyun Huang wrote: > > > A quick follow-up on using current PortableRunner. > > > > > > I followed the exact three steps as Ankur and Maximilian shared in > > > https://beam.apache.org/roadmap/portability/#python-on-flink ; The > > > wordcount example keeps hanging after 10 minutes. I also tried > > > specifying explicit input/output args, either using gcs folder or local > > > file system, but none of them works. > > > > > > Spent some time looking into it but conclusion yet. At this point > > > though, I guess it does not matter much any more, given we already have > > > the plan of merging PortableRunner into using java reference runner > > > (i.e. :beam-runners-reference-job-server). > > > > > > Still appreciated if someone can try out the python-on-flink > > > <https://beam.apache.org/roadmap/portability/#python-on-flink>instructions > > > > > in case it is just due to my local machine setup. Thanks! > > > > > > > > > > > > On Thu, Nov 8, 2018 at 5:04 PM Ruoyun Huang <ruo...@google.com > > > <mailto:ruo...@google.com>> wrote: > > > > > > Thanks Maximilian! > > > > > > I am working on migrating existing PortableRunner to using java ULR > > > (Link to Notes > > > < > > https://docs.google.com/document/d/1S86saZqiDaE_M5wxO0zOQ_rwC6QHv7sp1BmGTm0dLNE/edit# > > >). > > > If this issue is non-trivial to solve, I would vote for removing > > > this default behavior as part of the consolidation. > > > > > > On Thu, Nov 8, 2018 at 2:58 AM Maximilian Michels <m...@apache.org > > > <mailto:m...@apache.org>> wrote: > > > > > > In the long run, we should get rid of the Docker-inside-Docker > > > approach, > > > which was only intended for testing anyways. It would be cleaner > > to > > > start the SDK harness container alongside with JobServer > > container. > > > > > > Short term, I think it should be easy to either fix the > > > permissions of > > > the mounted "docker" executable or use a Docker image for the > > > JobServer > > > which comes with Docker pre-installed. > > > > > > JIRA: https://issues.apache.org/jira/browse/BEAM-6020 > > > > > > Thanks for reporting this Ruoyun! > > > > > > -Max > > > > > > On 08.11.18 00:10, Ruoyun Huang wrote: > > > > Thanks Ankur and Maximilian. > > > > > > > > Just for reference in case other people encountering the same > > > error > > > > message, the "permission denied" error in my original email > > > is exactly > > > > due to dockerinsidedocker issue that Ankur mentioned. > > > Thanks Ankur! > > > > Didn't make the link when you said it, had to discover that > > > in a hard > > > > way (I thought it is due to my docker installation messed up). > > > > > > > > On Tue, Nov 6, 2018 at 1:53 AM Maximilian Michels > > > <m...@apache.org <mailto:m...@apache.org> > > > > <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote: > > > > > > > > Hi, > > > > > > > > Please follow > > > > https://beam.apache.org/roadmap/portability/#python-on-flink > > > > > > > > Cheers, > > > > Max > > > > > > > > On 06.11.18 01:14, Ankur Goenka wrote: > > > > > Hi, > > > > > > > > > > The Portable Runner requires a job server uri to work > > > with. The > > > > current > > > > > default job server docker image is broken because of > > > docker inside > > > > > docker issue. > > > > > > > > > > Please refer to > > > > > > > > https://beam.apache.org/roadmap/portability/#python-on-flink for > > > > how to > > > > > run a wordcount using Portable Flink Runner. > > > > > > > > > > Thanks, > > > > > Ankur > > > > > > > > > > On Mon, Nov 5, 2018 at 3:41 PM Ruoyun Huang > > > <ruo...@google.com <mailto:ruo...@google.com> > > > > <mailto:ruo...@google.com <mailto:ruo...@google.com>> > > > > > <mailto:ruo...@google.com <mailto:ruo...@google.com> > > > <mailto:ruo...@google.com <mailto:ruo...@google.com>>>> wrote: > > > > > > > > > > Hi, Folks, > > > > > > > > > > I want to try out Python PortableRunner, by > > > using following > > > > > command: > > > > > > > > > > *sdk/python: python -m > > apache_beam.examples.wordcount > > > > > --output=/tmp/test_output --runner > > PortableRunner* > > > > > > > > > > It complains with following error message: > > > > > > > > > > Caused by: java.lang.Exception: The user defined > > > 'open()' method > > > > > caused an exception: java.io.IOException: Cannot > > > run program > > > > > "docker": error=13, Permission denied > > > > > at > > > > > > > > > org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:498) > > > > > at > > > > > > > > > > > > > > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368) > > > > > at > > > org.apache.flink.runtime.taskmanager.Task.run(Task.java:712) > > > > > ... 1 more > > > > > Caused by: > > > > > > > > > > > > > > > > org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.util.concurrent.UncheckedExecutionException: > > > > > java.io.IOException: Cannot run program "docker": > > > error=13, > > > > > Permission denied > > > > > at > > > > > > > > > > > > > > > > org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4994) > > > > > > > > > > ... 7 more > > > > > > > > > > > > > > > > > > > > My py2 environment is properly configured, because > > > DirectRunner > > > > > works. Also I tested my docker installation by > > > 'docker run > > > > > hello-world ', no issue. > > > > > > > > > > > > > > > Thanks. > > > > > -- > > > > > ================ > > > > > Ruoyun Huang > > > > > > > > > > > > > > > > > > > > > -- > > > > ================ > > > > Ruoyun Huang > > > > > > > > > > > > > > > > -- > > > ================ > > > Ruoyun Huang > > > > > > > > > > > > -- > > > ================ > > > Ruoyun Huang > > > > > > > > -- > ================ > Ruoyun Huang >