[
https://issues.apache.org/jira/browse/BEAM-10150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17548625#comment-17548625
]
Danny McCormick commented on BEAM-10150:
----------------------------------------
This issue has been migrated to https://github.com/apache/beam/issues/20248
> save_main_session not working in DoFn.setup() in Dataflow with Python
> ---------------------------------------------------------------------
>
> Key: BEAM-10150
> URL: https://issues.apache.org/jira/browse/BEAM-10150
> Project: Beam
> Issue Type: Bug
> Components: runner-dataflow, sdk-py-core
> Affects Versions: 2.21.0
> Environment: Python 3.7, Dataflow, Beam 2.21.0
> Reporter: Ryan Canty
> Priority: P3
>
> I have a dataflow pipeline that calls the DLP API in GCP. The relevant pieces
> of my code are:
> {code:java}
> from google.cloud.dlp import DlpServiceClient
> def run(argv):
> """Main entry point; defines and runs the pipeline."""
> opts = HashPipelineOptions()
> opts.view_as(SetupOptions).save_main_session = True
> std_opts = opts.view_as(StandardOptions)
> std_opts.streaming = True
> ...
> class DlpFindingDoFn(beam.DoFn):
> """Fetch DLP Findings as a PCollection"""
> def __init__(self, project, runner):
> beam.DoFn.__init__(self)
> self.project = project
> self.runner = runner
> self.inspect_config = {
> 'info_types' : [{'name': 'US_SOCIAL_SECURITY_NUMBER'}],
> 'min_likelihood': 'VERY_UNLIKELY',
> 'include_quote' : True # We need the output to match against the KV
> Store
> }
> def setup(self):
> self.dlp_client = DlpServiceClient()
> def process(self, element):
> # TODO: Remove when version 2.22.0 is released BEAM-7885
> if self.runner == 'DirectRunner':
> self.setup()
> # Convert the project id into a full resource id.
> parent = self.dlp_client.project_path(self.project)
> filename, chunk = element
> # Call the API.
> response = self.dlp_client.inspect_content(parent, self.inspect_config,
> {'value': chunk})
> if response.result.findings:
> for f in response.result.findings:
> yield (filename, f.quote)
> {code}
>
>
> This runs just fine locally, but when I run it in Dataflow I get this
> NameError because the class DlpServiceClient cannot be found.
> {code:java}
> 2020-05-29 06:29:19.799 PDTError message from worker:
> java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error
> received from SDK harness for instruction -2659: Traceback (most recent call
> last): File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 368, in get processor =
> self.cached_bundle_processors[bundle_descriptor_id].pop() IndexError: pop
> from empty list During handling of the above exception, another exception
> occurred: Traceback (most recent call last): File
> "apache_beam/runners/common.py", line 1004, in
> apache_beam.runners.common.DoFnRunner._invoke_lifecycle_method File
> "apache_beam/runners/common.py", line 488, in
> apache_beam.runners.common.DoFnInvoker.invoke_setup File "hashpipeline.py",
> line 108, in setup NameError: name 'DlpServiceClient' is not defined During
> handling of the above exception, another exception occurred: Traceback (most
> recent call last): File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 245, in _execute response = task() File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 302, in <lambda> lambda: self.create_worker().do_instruction(request),
> request) File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 471, in do_instruction getattr(request, request_type),
> request.instruction_id) File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 500, in process_bundle instruction_id,
> request.process_bundle_descriptor_id) File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 374, in get self.data_channel_factory) File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 843, in __init__ op.setup() File
> "apache_beam/runners/worker/operations.py", line 611, in
> apache_beam.runners.worker.operations.DoOperation.setup File
> "apache_beam/runners/worker/operations.py", line 660, in
> apache_beam.runners.worker.operations.DoOperation.setup File
> "apache_beam/runners/common.py", line 1010, in
> apache_beam.runners.common.DoFnRunner.setup File
> "apache_beam/runners/common.py", line 1006, in
> apache_beam.runners.common.DoFnRunner._invoke_lifecycle_method File
> "apache_beam/runners/common.py", line 1045, in
> apache_beam.runners.common.DoFnRunner._reraise_augmented File
> "/usr/local/lib/python3.7/site-packages/future/utils/__init__.py", line 446,
> in raise_with_traceback raise exc.with_traceback(traceback) File
> "apache_beam/runners/common.py", line 1004, in
> apache_beam.runners.common.DoFnRunner._invoke_lifecycle_method File
> "apache_beam/runners/common.py", line 488, in
> apache_beam.runners.common.DoFnInvoker.invoke_setup File "hashpipeline.py",
> line 108, in setup NameError: name 'DlpServiceClient' is not defined [while
> running 'generatedPtransform-2651']
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)
> org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.finish(RegisterAndProcessBundleOperation.java:333)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85)
>
> org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:123)
>
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1363)
>
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:153)
>
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1086)
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException:
> Error received from SDK harness for instruction -2659: Traceback (most recent
> call last): File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 368, in get processor =
> self.cached_bundle_processors[bundle_descriptor_id].pop() IndexError: pop
> from empty list During handling of the above exception, another exception
> occurred: Traceback (most recent call last): File
> "apache_beam/runners/common.py", line 1004, in
> apache_beam.runners.common.DoFnRunner._invoke_lifecycle_method File
> "apache_beam/runners/common.py", line 488, in
> apache_beam.runners.common.DoFnInvoker.invoke_setup File "hashpipeline.py",
> line 108, in setup NameError: name 'DlpServiceClient' is not defined During
> handling of the above exception, another exception occurred: Traceback (most
> recent call last): File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 245, in _execute response = task() File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 302, in <lambda> lambda: self.create_worker().do_instruction(request),
> request) File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 471, in do_instruction getattr(request, request_type),
> request.instruction_id) File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 500, in process_bundle instruction_id,
> request.process_bundle_descriptor_id) File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 374, in get self.data_channel_factory) File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 843, in __init__ op.setup() File
> "apache_beam/runners/worker/operations.py", line 611, in
> apache_beam.runners.worker.operations.DoOperation.setup File
> "apache_beam/runners/worker/operations.py", line 660, in
> apache_beam.runners.worker.operations.DoOperation.setup File
> "apache_beam/runners/common.py", line 1010, in
> apache_beam.runners.common.DoFnRunner.setup File
> "apache_beam/runners/common.py", line 1006, in
> apache_beam.runners.common.DoFnRunner._invoke_lifecycle_method File
> "apache_beam/runners/common.py", line 1045, in
> apache_beam.runners.common.DoFnRunner._reraise_augmented File
> "/usr/local/lib/python3.7/site-packages/future/utils/__init__.py", line 446,
> in raise_with_traceback raise exc.with_traceback(traceback) File
> "apache_beam/runners/common.py", line 1004, in
> apache_beam.runners.common.DoFnRunner._invoke_lifecycle_method File
> "apache_beam/runners/common.py", line 488, in
> apache_beam.runners.common.DoFnInvoker.invoke_setup File "hashpipeline.py",
> line 108, in setup NameError: name 'DlpServiceClient' is not defined [while
> running 'generatedPtransform-2651']
> org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:178)
>
> org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:158)
>
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251)
>
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
>
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
>
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309)
>
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292)
>
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782)
>
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
>
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> java.lang.Thread.run(Thread.java:748)
> {code}
> However, when I move the import line inside the setup function, everything
> works fine.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)