[
https://issues.apache.org/jira/browse/FLINK-35639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Matthias Pohl closed FLINK-35639.
---------------------------------
Resolution: Not A Problem
I'm closing the issue and the related PRs because we're actually not supporting
this kind of version upgrades in general. Fixing the {{RestartStrategy}} issue
wouldn't necessarily solve the issue.
> upgrade to 1.19 with job in HA state with restart strategy crashes job manager
> ------------------------------------------------------------------------------
>
> Key: FLINK-35639
> URL: https://issues.apache.org/jira/browse/FLINK-35639
> Project: Flink
> Issue Type: Bug
> Components: API / Core
> Affects Versions: 1.20.0, 1.19.1
> Environment: Download 1.18 and 1.19 binary releases. Add the
> following to flink-1.19.0/conf/config.yaml and
> flink-1.18.1/conf/flink-conf.yaml ```yaml high-availability: zookeeper
> high-availability.zookeeper.quorum: localhost high-availability.storageDir:
> file:///tmp/flink/recovery ``` Launch zookeeper: docker run --network host
> zookeeper:latest launch 1.18 task manager: ./flink-1.18.1/bin/taskmanager.sh
> start-foreground launch 1.18 job manager: ./flink-1.18.1/bin/jobmanager.sh
> start-foreground launch the following job: ```java import
> org.apache.flink.api.java.ExecutionEnvironment; import
> org.apache.flink.api.java.tuple.Tuple2; import
> org.apache.flink.api.common.functions.FlatMapFunction; import
> org.apache.flink.util.Collector; import
> org.apache.flink.api.common.restartstrategy.RestartStrategies; import
> org.apache.flink.api.common.time.Time; import java.util.concurrent.TimeUnit;
> public class FlinkJob \{ public static void main(String[] args) throws
> Exception { final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment(); env.setRestartStrategy(
> RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, Time.of(20,
> TimeUnit.SECONDS)) ); env.fromElements("Hello World", "Hello Flink")
> .flatMap(new LineSplitter()) .groupBy(0) .sum(1) .print(); } public static
> final class LineSplitter implements FlatMapFunction> \{ @Override public void
> flatMap(String value, Collector> out) { for (String word : value.split(" "))
> { try { Thread.sleep(120000); } catch (InterruptedException e) \{
> e.printStackTrace(); } out.collect(new Tuple2<>(word, 1)); } } } } ``` ```xml
> 4.0.0 org.apache.flink myflinkjob 1.0-SNAPSHOT 1.18.1 1.8 org.apache.flink
> flink-java ${flink.version} org.apache.flink flink-streaming-java
> ${flink.version} org.apache.maven.plugins maven-compiler-plugin 3.8.1
> ${java.version} ${java.version} org.apache.maven.plugins maven-jar-plugin
> 3.1.0 true lib/ FlinkJob ``` Launch job: ./flink-1.18.1/bin/flink run
> ../flink-job/target/myflinkjob-1.0-SNAPSHOT.jar Job has been submitted with
> JobID 5f0898c964a93a47aa480427f3e2c6c0 Kill job manager and task manager.
> Then launch job manager 1.19.0 ./flink-1.19.0/bin/jobmanager.sh
> start-foreground Root cause ========== It looks like the type of
> delayBetweenAttemptsInterval was changed in 1.19
> https://github.com/apache/flink/pull/22984/files#diff-d174f32ffdea69de610c4f37c545bd22a253b9846434f83397f1bbc2aaa399faR239
> , introducing an incompatibility which is not handled by flink 1.19. In my
> opinion, job-maanger should not crash when starting in that case.
> Reporter: yazgoo
> Assignee: Matthias Pohl
> Priority: Blocker
> Labels: pull-request-available
>
> When trying to upgrade a flink cluster from 1.18 to 1.19, with a 1.18 job in
> zookeeper HA state, I have a jobmanager crash with a ClassCastException, see
> log below
>
> {code:java}
> 2024-06-18 16:58:14,401 ERROR
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Fatal error
> occurred in the cluster entrypoint. org.apache.flink.util.FlinkException:
> JobMaster for job 5f0898c964a93a47aa480427f3e2c6c0 failed. at
> org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:1484)
> ~[flink-dist-1.19.0.jar:1.19.0] at
> org.apache.flink.runtime.dispatcher.Dispatcher.jobManagerRunnerFailed(Dispatcher.java:775)
> ~[flink-dist-1.19.0.jar:1.19.0] at
> org.apache.flink.runtime.dispatcher.Dispatcher.handleJobManagerRunnerResult(Dispatcher.java:738)
> ~[flink-dist-1.19.0.jar:1.19.0] at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$7(Dispatcher.java:693)
> ~[flink-dist-1.19.0.jar:1.19.0] at
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:934)
> ~[?:?] at
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:911)
> ~[?:?] at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482)
> ~[?:?] at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRunAsync$4(PekkoRpcActor.java:451)
> ~[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at
> org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
> ~[flink-dist-1.19.0.jar:1.19.0] at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRunAsync(PekkoRpcActor.java:451)
> ~[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:218)
> ~[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at
> org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
> ~[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
> ~[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at
> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
> [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at
> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
> [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at
> scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
> [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at
> scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
> [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at
> org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
> [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
> [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
> [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
> [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at
> org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
> [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at
> org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
> [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at
> org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
> [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at
> org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
> [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at
> org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
> [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at
> org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
> [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at
> org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
> [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at
> org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
> [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at
> java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373) [?:?] at
> java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
> [?:?] at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
> [?:?] at
> java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622) [?:?]
> at
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
> [?:?] Caused by: org.apache.flink.runtime.client.JobInitializationException:
> Could not start the JobMaster. at
> org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
> ~[flink-dist-1.19.0.jar:1.19.0] at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
> ~[?:?] at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
> ~[?:?] at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
> ~[?:?] at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1773)
> ~[?:?] at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
> ~[?:?] at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
> ~[?:?] at java.lang.Thread.run(Thread.java:840) ~[?:?] Caused by:
> java.util.concurrent.CompletionException: java.lang.ClassCastException:
> cannot assign instance of org.apache.flink.api.common.time.Time to field
> org.apache.flink.api.common.restartstrategy.RestartStrategies$FixedDelayRestartStrategyConfiguration.delayBetweenAttemptsInterval
> of type java.time.Duration in instance of
> org.apache.flink.api.common.restartstrategy.RestartStrategies$FixedDelayRestartStrategyConfiguration
> at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315)
> ~[?:?] at
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320)
> ~[?:?] at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1770)
> ~[?:?] at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
> ~[?:?] at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
> ~[?:?] at java.lang.Thread.run(Thread.java:840) ~[?:?] Caused by:
> java.lang.ClassCastException: cannot assign instance of
> org.apache.flink.api.common.time.Time to field
> org.apache.flink.api.common.restartstrategy.RestartStrategies$FixedDelayRestartStrategyConfiguration.delayBetweenAttemptsInterval
> of type java.time.Duration in instance of
> org.apache.flink.api.common.restartstrategy.RestartStrategies$FixedDelayRestartStrategyConfiguration
> at
> java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2096)
> ~[?:?] at
> java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(ObjectStreamClass.java:2060)
> ~[?:?] at
> java.io.ObjectStreamClass.checkObjFieldValueTypes(ObjectStreamClass.java:1347)
> ~[?:?] at
> java.io.ObjectInputStream$FieldValues.defaultCheckFieldValues(ObjectInputStream.java:2679)
> ~[?:?] at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2486) ~[?:?]
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257)
> ~[?:?] at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733) ~[?:?]
> at java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2606)
> ~[?:?] at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2457) ~[?:?]
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257)
> ~[?:?] at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733) ~[?:?]
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:509) ~[?:?]
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:467) ~[?:?]
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:539)
> ~[flink-dist-1.19.0.jar:1.19.0] at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:527)
> ~[flink-dist-1.19.0.jar:1.19.0] at
> org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:67)
> ~[flink-dist-1.19.0.jar:1.19.0] at
> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:102)
> ~[flink-dist-1.19.0.jar:1.19.0] at
> org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:121)
> ~[flink-dist-1.19.0.jar:1.19.0] at
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:379)
> ~[flink-dist-1.19.0.jar:1.19.0] at
> org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:356)
> ~[flink-dist-1.19.0.jar:1.19.0] at
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:128)
> ~[flink-dist-1.19.0.jar:1.19.0] at
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:100)
> ~[flink-dist-1.19.0.jar:1.19.0] at
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
> ~[flink-dist-1.19.0.jar:1.19.0] at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
> ~[?:?] at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
> ~[?:?] at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
> ~[?:?] at java.lang.Thread.run(Thread.java:840) ~[?:?] 2024-06-18
> 16:58:14,403 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint
> [] - Shutting StandaloneSessionClusterEntrypoint down with application
> status UNKNOWN. Diagnostics Cluster entrypoint has been closed externally..
> 2024-06-18 16:58:14,404 INFO org.apache.flink.runtime.blob.BlobServer
> [] - Stopped BLOB server at 127.0.0.1:40067 2024-06-18
> 16:58:14,431 INFO
> org.apache.flink.shaded.curator5.org.apache.curator.utils.Compatibility [] -
> Using org.apache.zookeeper.server.quorum.MultipleAddresses
> {code}
> *Reproducing*
> Download 1.18 and 1.19 binary releases.
> Add the following to flink-1.19.0/conf/config.yaml and
> flink-1.18.1/conf/flink-conf.yaml
>
> {code:java}
> high-availability: zookeeper
> high-availability.zookeeper.quorum: localhost
> high-availability.storageDir: file:///tmp/flink/recovery {code}
> Launch zookeeper:
> {code:java}
> docker run --network host zookeeper:latest{code}
> launch 1.18 job manager:
> {code:java}
> ./flink-1.18.1/bin/jobmanager.sh start-foreground{code}
> launch 1.18 task manager:
> {code:java}
> ./flink-1.18.1/bin/taskmanager.sh start-foreground{code}
> create the following job:
> {code:java}
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.api.common.functions.FlatMapFunction;
> import org.apache.flink.util.Collector;
> import org.apache.flink.api.common.restartstrategy.RestartStrategies;
> import org.apache.flink.api.common.time.Time;
> import java.util.concurrent.TimeUnit;public class FlinkJob {
> public static void main(String[] args) throws Exception {
> final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment(); env.setRestartStrategy(
> RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE,
> Time.of(20, TimeUnit.SECONDS))
> ); env.fromElements("Hello World", "Hello Flink")
> .flatMap(new LineSplitter())
> .groupBy(0)
> .sum(1)
> .print();
> } public static final class LineSplitter implements
> FlatMapFunction<String, Tuple2<String, Integer>> {
> @Override
> public void flatMap(String value, Collector<Tuple2<String, Integer>>
> out) {
> for (String word : value.split(" ")) {
> try {
> Thread.sleep(120000);
> } catch (InterruptedException e) {
> e.printStackTrace();
> }
> out.collect(new Tuple2<>(word, 1));
> }
> }
> }}
> {code}
> pom.xml
> {code:xml}
> <project xmlns="http://maven.apache.org/POM/4.0.0"
> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
> http://maven.apache.org/xsd/maven-4.0.0.xsd">
> <modelVersion>4.0.0</modelVersion>
> <groupId>org.apache.flink</groupId>
> <artifactId>myflinkjob</artifactId>
> <version>1.0-SNAPSHOT</version>
> <properties>
> <flink.version>1.18.1</flink.version>
> <java.version>1.8</java.version>
> </properties>
> <dependencies>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-java</artifactId>
> <version>${flink.version}</version>
> </dependency>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-streaming-java</artifactId>
> <version>${flink.version}</version>
> </dependency>
> </dependencies>
> <build>
> <plugins>
> <plugin>
> <groupId>org.apache.maven.plugins</groupId>
> <artifactId>maven-compiler-plugin</artifactId>
> <version>3.8.1</version>
> <configuration>
> <source>${java.version}</source>
> <target>${java.version}</target>
> </configuration>
> </plugin>
> <plugin>
> <groupId>org.apache.maven.plugins</groupId>
> <artifactId>maven-jar-plugin</artifactId>
> <version>3.1.0</version>
> <configuration>
> <archive>
> <manifest>
> <addClasspath>true</addClasspath>
> <classpathPrefix>lib/</classpathPrefix>
> <mainClass>FlinkJob</mainClass>
> </manifest>
> </archive>
> </configuration>
> </plugin>
> </plugins>
> </build>
> </project>
> {code}
> Launch job:
> {code:java}
> ./flink-1.18.1/bin/flink run ../flink-job/target/myflinkjob-1.0-SNAPSHOT.jar
> Job has been submitted with JobID 5f0898c964a93a47aa480427f3e2c6c0{code}
> Kill job manager and task manager.
> Then launch job manager 1.19.0
> {code:java}
> ./flink-1.19.0/bin/jobmanager.sh start-foreground{code}
> job manager will crash with stack trace above.
> *Root cause*
> It looks like the type of delayBetweenAttemptsInterval was changed in 1.19
> [https://github.com/apache/flink/pull/22984/files#diff-d174f32ffdea69de610c4f37c545bd22a253b9846434f83397f1bbc2aaa399faR239|http://example.com/]
> , introducing an incompatibility which is not handled by flink 1.19.
> In my opinion, job-maanger should not crash when starting in that case.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)