Youjun Yuan created FLINK-37684:
-----------------------------------

             Summary: ClassCastException: cannot assign instance of 
java.lang.invoke.SerializedLambda to field 
org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.keySelector
                 Key: FLINK-37684
                 URL: https://issues.apache.org/jira/browse/FLINK-37684
             Project: Flink
          Issue Type: Bug
          Components: API / DataStream
    Affects Versions: 1.14.4
            Reporter: Youjun Yuan


Flink job failed to run (in k8s) with below error.

The job just has 4 operators, see below. One thing interesting, is that if I 
remove the second one (id=74, which is just a map operator), then it's fine.

 

Topo:
{code:java}
{
  "nodes" : [ {
    "id" : 73,
    "type" : "Source: click-tracking-record-kafka_source",
    "pact" : "Data Source",
    "contents" : "Source: click-tracking-record-kafka_source",
    "parallelism" : 1
  }, {
    "id" : 74,
    "type" : "Proto-Deserialization",
    "pact" : "Operator",
    "contents" : "Proto-Deserialization",
    "parallelism" : 1,
    "predecessors" : [ {
      "id" : 73,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 76,
    "type" : "KeyedProcess",
    "pact" : "Operator",
    "contents" : "KeyedProcess",
    "parallelism" : 1,
    "predecessors" : [ {
      "id" : 74,
      "ship_strategy" : "HASH",
      "side" : "second"
    } ]
  }, {
    "id" : 79,
    "type" : "Sink 
uf-aggregate_-_to_kafka_-_click-tracking-record_-_as_-_int-dj-live-user-features-dev03",
    "pact" : "Operator",
    "contents" : "Sink 
uf-aggregate_-_to_kafka_-_click-tracking-record_-_as_-_int-dj-live-user-features-dev03",
    "parallelism" : 1,
    "predecessors" : [ {
      "id" : 76,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  } ]
}{code}
 

Full call stack:
{code:java}
Caused by: java.lang.ClassCastException: cannot assign instance of 
java.lang.invoke.SerializedLambda to field 
org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.keySelector
 of type org.apache.flink.api.java.functions.KeySelector in instance of 
org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner
    at 
java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2301)
 ~[?:1.8.0_322]
    at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1431) 
~[?:1.8.0_322]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2437) 
~[?:1.8.0_322]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355) 
~[?:1.8.0_322]
    at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213) 
~[?:1.8.0_322]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669) 
~[?:1.8.0_322]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431) 
~[?:1.8.0_322]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355) 
~[?:1.8.0_322]
    at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213) 
~[?:1.8.0_322]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669) 
~[?:1.8.0_322]
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503) 
~[?:1.8.0_322]
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461) 
~[?:1.8.0_322]
    at java.util.ArrayList.readObject(ArrayList.java:799) ~[?:1.8.0_322]
    at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source) ~[?:?]
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_322]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_322]
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184) 
~[?:1.8.0_322]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2322) 
~[?:1.8.0_322]
    at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213) 
~[?:1.8.0_322]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669) 
~[?:1.8.0_322]
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503) 
~[?:1.8.0_322]
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461) 
~[?:1.8.0_322]
    at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617)
 ~[flink-dist_2.12-1.14.4.jar:1.14.4]
    at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602)
 ~[flink-dist_2.12-1.14.4.jar:1.14.4]
    at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589)
 ~[flink-dist_2.12-1.14.4.jar:1.14.4]
    at 
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:543)
 ~[flink-dist_2.12-1.14.4.jar:1.14.4]
    at 
org.apache.flink.streaming.api.graph.StreamConfig.getNonChainedOutputs(StreamConfig.java:387)
 ~[flink-dist_2.12-1.14.4.jar:1.14.4]
    at 
org.apache.flink.streaming.api.graph.StreamConfig.toString(StreamConfig.java:714)
 ~[flink-dist_2.12-1.14.4.jar:1.14.4]
    at java.lang.String.valueOf(String.java:2994) ~[?:1.8.0_322]
    at java.lang.StringBuilder.append(StringBuilder.java:136) ~[?:1.8.0_322]
    at java.util.AbstractMap.toString(AbstractMap.java:559) ~[?:1.8.0_322]
    at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1421) 
~[?:1.8.0_322]
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
~[?:1.8.0_322]
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) 
~[?:1.8.0_322]
    at 
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:632)
 ~[flink-dist_2.12-1.14.4.jar:1.14.4]
    at 
org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:548)
 ~[flink-dist_2.12-1.14.4.jar:1.14.4]
    at 
org.apache.flink.streaming.api.graph.StreamConfig.setTransitiveChainedTaskConfigs(StreamConfig.java:495)
 ~[flink-dist_2.12-1.14.4.jar:1.14.4]
    at 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:479)
 ~[flink-dist_2.12-1.14.4.jar:1.14.4]
    at 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setChaining(StreamingJobGraphGenerator.java:377)
 ~[flink-dist_2.12-1.14.4.jar:1.14.4]
    at 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:178)
 ~[flink-dist_2.12-1.14.4.jar:1.14.4]
    at 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:116)
 ~[flink-dist_2.12-1.14.4.jar:1.14.4]
    at 
org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:985)
 ~[flink-dist_2.12-1.14.4.jar:1.14.4]
    at 
org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:50)
 ~[flink-dist_2.12-1.14.4.jar:1.14.4]
    at 
org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:39)
 ~[flink-dist_2.12-1.14.4.jar:1.14.4]
    at 
org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:56)
 ~[flink-dist_2.12-1.14.4.jar:1.14.4]
    at 
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor.submitAndGetJobClientFuture(EmbeddedExecutor.java:122)
 ~[flink-dist_2.12-1.14.4.jar:1.14.4]
    at 
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor.execute(EmbeddedExecutor.java:104)
 ~[flink-dist_2.12-1.14.4.jar:1.14.4]
    at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2042)
 ~[flink-dist_2.12-1.14.4.jar:1.14.4]
    at 
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:137)
 ~[flink-dist_2.12-1.14.4.jar:1.14.4]
    at 
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
 ~[flink-dist_2.12-1.14.4.jar:1.14.4]
    at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1916)
 ~[flink-dist_2.12-1.14.4.jar:1.14.4]
    at 
com.thetradedesk.streaming.services.databahnjunction.DatabahnJunction.main(DatabahnJunction.java:56)
 ~[?:?]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
~[?:1.8.0_322]
    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
~[?:1.8.0_322]
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_322]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_322]
    at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to