Abacn opened a new issue, #23632:
URL: https://github.com/apache/beam/issues/23632

   ### What happened?
   
   When running a Python Pipeline with Flink runner, FlinkJobInvoker tries to 
translate the pipeline options with a heuristic abc_def -> abcDef. For 
sdk_harness_log_level_overrides, this translates to sdkHarnessLogLevelOverrides 
which is an existing option in SdkHarnessOptions. But 
SdkHarnessOptions.sdkHarnessLogLevelOverrides is a Map<String, String>, while 
sdk_harness_log_level_overrides is a list. The following exception then thrown 
at job invoke:
   
   ```
   INFO:apache_beam.utils.subprocess_server:SEVERE: Encountered Unexpected 
Exception for Preparation job_2a9da77a-4230-4232-aee0-546173e9b0e4
   INFO:apache_beam.utils.subprocess_server:java.lang.RuntimeException: Unable 
to parse representation
   INFO:apache_beam.utils.subprocess_server:    at 
org.apache.beam.sdk.options.ProxyInvocationHandler.getValueFromJson(ProxyInvocationHandler.java:586)
   INFO:apache_beam.utils.subprocess_server:    at 
org.apache.beam.sdk.options.ProxyInvocationHandler.getValueFromJson(ProxyInvocationHandler.java:579)
   INFO:apache_beam.utils.subprocess_server:    at 
org.apache.beam.sdk.options.ProxyInvocationHandler.invoke(ProxyInvocationHandler.java:219)
   INFO:apache_beam.utils.subprocess_server:    at 
com.sun.proxy.$Proxy44.getSdkHarnessLogLevelOverrides(Unknown Source)
   INFO:apache_beam.utils.subprocess_server:    at 
org.apache.beam.runners.flink.FlinkJobInvoker.setLogLevelsFromPipelineOption(FlinkJobInvoker.java:109)
   INFO:apache_beam.utils.subprocess_server:    at 
org.apache.beam.runners.flink.FlinkJobInvoker.invokeWithExecutor(FlinkJobInvoker.java:79)
   INFO:apache_beam.utils.subprocess_server:    at 
org.apache.beam.runners.jobsubmission.JobInvoker.invoke(JobInvoker.java:48)
   INFO:apache_beam.utils.subprocess_server:    at 
org.apache.beam.runners.jobsubmission.InMemoryJobService.run(InMemoryJobService.java:246)
   INFO:apache_beam.utils.subprocess_server:    at 
org.apache.beam.model.jobmanagement.v1.JobServiceGrpc$MethodHandlers.invoke(JobServiceGrpc.java:949)
   INFO:apache_beam.utils.subprocess_server:    at 
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
   INFO:apache_beam.utils.subprocess_server:    at 
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
   INFO:apache_beam.utils.subprocess_server:    at 
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
   INFO:apache_beam.utils.subprocess_server:    at 
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
   INFO:apache_beam.utils.subprocess_server:    at 
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)
   INFO:apache_beam.utils.subprocess_server:    at 
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:354)
   INFO:apache_beam.utils.subprocess_server:    at 
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:866)
   INFO:apache_beam.utils.subprocess_server:    at 
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
   INFO:apache_beam.utils.subprocess_server:    at 
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
   INFO:apache_beam.utils.subprocess_server:    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   INFO:apache_beam.utils.subprocess_server:    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   INFO:apache_beam.utils.subprocess_server:    at 
java.lang.Thread.run(Thread.java:750)
   INFO:apache_beam.utils.subprocess_server:Caused by: 
com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot construct 
instance of `java.util.LinkedHashMap` (although at least one Creator exists): 
no String-argument constructor/factory method to deserialize from String value 
('{"apache_beam.runners.dataflow":"WARNING","apache_beam.runners.dataflow.foo":"INFO"}')
   INFO:apache_beam.utils.subprocess_server: at [Source: UNKNOWN; byte offset: 
#UNKNOWN] (through reference chain: java.util.ArrayList[0])
   INFO:apache_beam.utils.subprocess_server:    at 
com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:63)
   INFO:apache_beam.utils.subprocess_server:    at 
com.fasterxml.jackson.databind.DeserializationContext.reportInputMismatch(DeserializationContext.java:1728)
   INFO:apache_beam.utils.subprocess_server:    at 
com.fasterxml.jackson.databind.DeserializationContext.handleMissingInstantiator(DeserializationContext.java:1353)
   INFO:apache_beam.utils.subprocess_server:    at 
com.fasterxml.jackson.databind.deser.std.StdDeserializer._deserializeFromString(StdDeserializer.java:311)
   INFO:apache_beam.utils.subprocess_server:    at 
com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:444)
   INFO:apache_beam.utils.subprocess_server:    at 
com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:32)
   INFO:apache_beam.utils.subprocess_server:    at 
com.fasterxml.jackson.databind.deser.std.CollectionDeserializer._deserializeFromArray(CollectionDeserializer.java:355)
   INFO:apache_beam.utils.subprocess_server:    at 
com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:244)
   INFO:apache_beam.utils.subprocess_server:    at 
com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:28)
   INFO:apache_beam.utils.subprocess_server:    at 
com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:424)
   INFO:apache_beam.utils.subprocess_server:    at 
com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:32)
   INFO:apache_beam.utils.subprocess_server:    at 
org.apache.beam.sdk.options.PipelineOptionsFactory.deserializeNode(PipelineOptionsFactory.java:1807)
   INFO:apache_beam.utils.subprocess_server:    at 
org.apache.beam.sdk.options.ProxyInvocationHandler.getValueFromJson(ProxyInvocationHandler.java:584)
   INFO:apache_beam.utils.subprocess_server:    ... 20 more
   INFO:apache_beam.utils.subprocess_server:
   
   ```
   
   This affects Python pipelines used `sdk_harness_log_level_overrides` option 
running on Flink, Spark, and Sazma runners, where these jobInvoker called 
PipelineOptionsTranslation.fromProto.
   
   We should probably make the change from Python side, make sure that when the 
option passed to proto it is compatible with Java. i.e., having a well defined 
proto with map value.
   
   ### Issue Priority
   
   Priority: 2
   
   ### Issue Component
   
   Component: runner-flink


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to