Abacn opened a new issue, #23632:
URL: https://github.com/apache/beam/issues/23632
### What happened?
When running a Python Pipeline with Flink runner, FlinkJobInvoker tries to
translate the pipeline options with a heuristic abc_def -> abcDef. For
sdk_harness_log_level_overrides, this translates to sdkHarnessLogLevelOverrides
which is an existing option in SdkHarnessOptions. But
SdkHarnessOptions.sdkHarnessLogLevelOverrides is a Map<String, String>, while
sdk_harness_log_level_overrides is a list. The following exception then thrown
at job invoke:
```
INFO:apache_beam.utils.subprocess_server:SEVERE: Encountered Unexpected
Exception for Preparation job_2a9da77a-4230-4232-aee0-546173e9b0e4
INFO:apache_beam.utils.subprocess_server:java.lang.RuntimeException: Unable
to parse representation
INFO:apache_beam.utils.subprocess_server: at
org.apache.beam.sdk.options.ProxyInvocationHandler.getValueFromJson(ProxyInvocationHandler.java:586)
INFO:apache_beam.utils.subprocess_server: at
org.apache.beam.sdk.options.ProxyInvocationHandler.getValueFromJson(ProxyInvocationHandler.java:579)
INFO:apache_beam.utils.subprocess_server: at
org.apache.beam.sdk.options.ProxyInvocationHandler.invoke(ProxyInvocationHandler.java:219)
INFO:apache_beam.utils.subprocess_server: at
com.sun.proxy.$Proxy44.getSdkHarnessLogLevelOverrides(Unknown Source)
INFO:apache_beam.utils.subprocess_server: at
org.apache.beam.runners.flink.FlinkJobInvoker.setLogLevelsFromPipelineOption(FlinkJobInvoker.java:109)
INFO:apache_beam.utils.subprocess_server: at
org.apache.beam.runners.flink.FlinkJobInvoker.invokeWithExecutor(FlinkJobInvoker.java:79)
INFO:apache_beam.utils.subprocess_server: at
org.apache.beam.runners.jobsubmission.JobInvoker.invoke(JobInvoker.java:48)
INFO:apache_beam.utils.subprocess_server: at
org.apache.beam.runners.jobsubmission.InMemoryJobService.run(InMemoryJobService.java:246)
INFO:apache_beam.utils.subprocess_server: at
org.apache.beam.model.jobmanagement.v1.JobServiceGrpc$MethodHandlers.invoke(JobServiceGrpc.java:949)
INFO:apache_beam.utils.subprocess_server: at
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
INFO:apache_beam.utils.subprocess_server: at
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
INFO:apache_beam.utils.subprocess_server: at
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
INFO:apache_beam.utils.subprocess_server: at
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
INFO:apache_beam.utils.subprocess_server: at
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)
INFO:apache_beam.utils.subprocess_server: at
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:354)
INFO:apache_beam.utils.subprocess_server: at
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:866)
INFO:apache_beam.utils.subprocess_server: at
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
INFO:apache_beam.utils.subprocess_server: at
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
INFO:apache_beam.utils.subprocess_server: at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
INFO:apache_beam.utils.subprocess_server: at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
INFO:apache_beam.utils.subprocess_server: at
java.lang.Thread.run(Thread.java:750)
INFO:apache_beam.utils.subprocess_server:Caused by:
com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot construct
instance of `java.util.LinkedHashMap` (although at least one Creator exists):
no String-argument constructor/factory method to deserialize from String value
('{"apache_beam.runners.dataflow":"WARNING","apache_beam.runners.dataflow.foo":"INFO"}')
INFO:apache_beam.utils.subprocess_server: at [Source: UNKNOWN; byte offset:
#UNKNOWN] (through reference chain: java.util.ArrayList[0])
INFO:apache_beam.utils.subprocess_server: at
com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:63)
INFO:apache_beam.utils.subprocess_server: at
com.fasterxml.jackson.databind.DeserializationContext.reportInputMismatch(DeserializationContext.java:1728)
INFO:apache_beam.utils.subprocess_server: at
com.fasterxml.jackson.databind.DeserializationContext.handleMissingInstantiator(DeserializationContext.java:1353)
INFO:apache_beam.utils.subprocess_server: at
com.fasterxml.jackson.databind.deser.std.StdDeserializer._deserializeFromString(StdDeserializer.java:311)
INFO:apache_beam.utils.subprocess_server: at
com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:444)
INFO:apache_beam.utils.subprocess_server: at
com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:32)
INFO:apache_beam.utils.subprocess_server: at
com.fasterxml.jackson.databind.deser.std.CollectionDeserializer._deserializeFromArray(CollectionDeserializer.java:355)
INFO:apache_beam.utils.subprocess_server: at
com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:244)
INFO:apache_beam.utils.subprocess_server: at
com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:28)
INFO:apache_beam.utils.subprocess_server: at
com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:424)
INFO:apache_beam.utils.subprocess_server: at
com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:32)
INFO:apache_beam.utils.subprocess_server: at
org.apache.beam.sdk.options.PipelineOptionsFactory.deserializeNode(PipelineOptionsFactory.java:1807)
INFO:apache_beam.utils.subprocess_server: at
org.apache.beam.sdk.options.ProxyInvocationHandler.getValueFromJson(ProxyInvocationHandler.java:584)
INFO:apache_beam.utils.subprocess_server: ... 20 more
INFO:apache_beam.utils.subprocess_server:
```
This affects Python pipelines used `sdk_harness_log_level_overrides` option
running on Flink, Spark, and Sazma runners, where these jobInvoker called
PipelineOptionsTranslation.fromProto.
We should probably make the change from Python side, make sure that when the
option passed to proto it is compatible with Java. i.e., having a well defined
proto with map value.
### Issue Priority
Priority: 2
### Issue Component
Component: runner-flink
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]