[
https://issues.apache.org/jira/browse/FLINK-18637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17161104#comment-17161104
]
Yun Tang commented on FLINK-18637:
----------------------------------
[~oripwk] What's the max parallelism of your job? If your job does not assign
explicit max parallelism, what's your largest parallelism in your job?
If we really has the key group 13880 and did not set the max parallelism
explicitly. The operator parallelism of the keyed operator should be at least
5462. Is that true for your job?
I ask this to figure out whether the key group 13880 could existed.
BTW, have you ever watched this behavior before or can you reproduce this with
some simple job with fake data?
> Key group is not in KeyGroupRange
> ---------------------------------
>
> Key: FLINK-18637
> URL: https://issues.apache.org/jira/browse/FLINK-18637
> Project: Flink
> Issue Type: Bug
> Environment: Version: 1.10.0, Rev:<unknown>, Date:<unknown>
> OS current user: yarn
> Current Hadoop/Kerberos user: hadoop
> JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.8/25.141-b15
> Maximum heap size: 28960 MiBytes
> JAVA_HOME: /usr/java/jdk1.8.0_141/jre
> Hadoop version: 2.8.5-amzn-6
> JVM Options:
> -Xmx30360049728
> -Xms30360049728
> -XX:MaxDirectMemorySize=4429185024
> -XX:MaxMetaspaceSize=1073741824
> -XX:+UseG1GC
> -XX:+UnlockDiagnosticVMOptions
> -XX:+G1SummarizeConcMark
> -verbose:gc
> -XX:+PrintGCDetails
> -XX:+PrintGCDateStamps
> -XX:+UnlockCommercialFeatures
> -XX:+FlightRecorder
> -XX:+DebugNonSafepoints
>
> -XX:FlightRecorderOptions=defaultrecording=true,settings=/home/hadoop/heap.jfc,dumponexit=true,dumponexitpath=/var/lib/hadoop-yarn/recording.jfr,loglevel=info
>
> -Dlog.file=/var/log/hadoop-yarn/containers/application_1593935560662_0002/container_1593935560662_0002_01_000002/taskmanager.log
> -Dlog4j.configuration=[file:./log4j.properties|file:///log4j.properties]
> Program Arguments:
> -Dtaskmanager.memory.framework.off-heap.size=134217728b
> -Dtaskmanager.memory.network.max=1073741824b
> -Dtaskmanager.memory.network.min=1073741824b
> -Dtaskmanager.memory.framework.heap.size=134217728b
> -Dtaskmanager.memory.managed.size=23192823744b
> -Dtaskmanager.cpu.cores=7.0
> -Dtaskmanager.memory.task.heap.size=30225832000b
> -Dtaskmanager.memory.task.off-heap.size=3221225472b
> --configDir.
>
> -Djobmanager.rpc.address=ip-10-180-30-250.us-west-2.compute.internal-Dweb.port=0
> -Dweb.tmpdir=/tmp/flink-web-64f613cf-bf04-4a09-8c14-75c31b619574
> -Djobmanager.rpc.port=33739
> -Drest.address=ip-10-180-30-250.us-west-2.compute.internal
> Reporter: Ori Popowski
> Priority: Major
>
> I'm getting this error when creating a savepoint. I've read in
> https://issues.apache.org/jira/browse/FLINK-16193 that it's caused by
> unstable hashcode or equals on the key, or improper use of
> {{reinterpretAsKeyedStream}}.
>
> My key is a string and I don't use {{reinterpretAsKeyedStream}}.
>
> {code:java}
> senv
> .addSource(source)
> .flatMap(…)
> .filterWith { case (metadata, _, _) => … }
> .assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessTimestampExtractor(…))
> .keyingBy { case (meta, _) => meta.toPathString }
> .process(new TruncateLargeSessions(config.sessionSizeLimit))
> .keyingBy { case (meta, _) => meta.toPathString }
> .window(EventTimeSessionWindows.withGap(Time.of(…)))
> .process(new ProcessSession(sessionPlayback, config))
> .addSink(sink){code}
>
> {code:java}
> org.apache.flink.util.FlinkException: Triggering a savepoint for the job
> 962fc8e984e7ca1ed65a038aa62ce124 failed.
> at
> org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:633)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:611)
> at
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
> at
> org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:608)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:910)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
> at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> Caused by: java.util.concurrent.CompletionException:
> java.util.concurrent.CompletionException:
> org.apache.flink.runtime.checkpoint.CheckpointException: The job has failed.
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.lambda$triggerSavepoint$3(SchedulerBase.java:744)
> at
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
> at
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
> at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.util.concurrent.CompletionException:
> org.apache.flink.runtime.checkpoint.CheckpointException: The job has failed.
> at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
> at
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
> at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
> at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$null$1(CheckpointCoordinator.java:457)
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> at
> org.apache.flink.runtime.checkpoint.PendingCheckpoint.abort(PendingCheckpoint.java:429)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.failPendingCheckpointDueToTaskFailure(CheckpointCoordinator.java:1466)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.discardCheckpoint(CheckpointCoordinator.java:1379)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:719)
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.lambda$declineCheckpoint$5(SchedulerBase.java:807)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: The job
> has failed.
> at
> org.apache.flink.runtime.checkpoint.PendingCheckpoint.abort(PendingCheckpoint.java:428)
> ... 11 more
> Caused by: java.lang.Exception: Could not materialize checkpoint 15 for
> operator KeyedProcess (11/216).
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1238)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1180)
> ... 3 more
> Caused by: java.util.concurrent.ExecutionException:
> java.lang.IllegalArgumentException: Key group 13880 is not in
> KeyGroupRange{startKeyGroup=24, endKeyGroup=26}.
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> at
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:461)
> at
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143)
> ... 3 more
> Caused by: java.lang.IllegalArgumentException: Key group 13880 is not in
> KeyGroupRange{startKeyGroup=24, endKeyGroup=26}.
> at
> org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)
> at
> org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)
> at
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:350)
> at
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:256)
> at
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:221)
> at
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:174)
> at
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458)
> {code}
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)