[ https://issues.apache.org/jira/browse/FLINK-24062?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
ASF GitHub Bot updated FLINK-24062: ----------------------------------- Labels: pull-request-available release-testing (was: release-testing) > Exception encountered during timer serialization in Python DataStream API > -------------------------------------------------------------------------- > > Key: FLINK-24062 > URL: https://issues.apache.org/jira/browse/FLINK-24062 > Project: Flink > Issue Type: Bug > Components: API / Python > Affects Versions: 1.14.0 > Reporter: Dian Fu > Assignee: Dian Fu > Priority: Major > Labels: pull-request-available, release-testing > Fix For: 1.14.0 > > > For the following example: > {code} > ################################################################################ > # Licensed to the Apache Software Foundation (ASF) under one > # or more contributor license agreements. See the NOTICE file > # distributed with this work for additional information > # regarding copyright ownership. The ASF licenses this file > # to you under the Apache License, Version 2.0 (the > # "License"); you may not use this file except in compliance > # with the License. You may obtain a copy of the License at > # > # http://www.apache.org/licenses/LICENSE-2.0 > # > # Unless required by applicable law or agreed to in writing, software > # distributed under the License is distributed on an "AS IS" BASIS, > # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > # See the License for the specific language governing permissions and > # limitations under the License. > ################################################################################ > import argparse > import logging > import sys > from pyflink.common import WatermarkStrategy, Encoder, Types > from pyflink.datastream import StreamExecutionEnvironment, > RuntimeExecutionMode > from pyflink.datastream.connectors import (FileSource, StreamFormat, > FileSink, OutputFileConfig, > RollingPolicy) > word_count_data = ["To be, or not to be,--that is the question:--", > "Whether 'tis nobler in the mind to suffer", > "The slings and arrows of outrageous fortune", > "Or to take arms against a sea of troubles,", > "And by opposing end them?--To die,--to sleep,--", > "No more; and by a sleep to say we end", > "The heartache, and the thousand natural shocks", > "That flesh is heir to,--'tis a consummation", > "Devoutly to be wish'd. To die,--to sleep;--", > "To sleep! perchance to dream:--ay, there's the rub;", > "For in that sleep of death what dreams may come,", > "When we have shuffled off this mortal coil,", > "Must give us pause: there's the respect", > "That makes calamity of so long life;", > "For who would bear the whips and scorns of time,", > "The oppressor's wrong, the proud man's contumely,", > "The pangs of despis'd love, the law's delay,", > "The insolence of office, and the spurns", > "That patient merit of the unworthy takes,", > "When he himself might his quietus make", > "With a bare bodkin? who would these fardels bear,", > "To grunt and sweat under a weary life,", > "But that the dread of something after death,--", > "The undiscover'd country, from whose bourn", > "No traveller returns,--puzzles the will,", > "And makes us rather bear those ills we have", > "Than fly to others that we know not of?", > "Thus conscience does make cowards of us all;", > "And thus the native hue of resolution", > "Is sicklied o'er with the pale cast of thought;", > "And enterprises of great pith and moment,", > "With this regard, their currents turn awry,", > "And lose the name of action.--Soft you now!", > "The fair Ophelia!--Nymph, in thy orisons", > "Be all my sins remember'd."] > def word_count(input_path, output_path): > env = StreamExecutionEnvironment.get_execution_environment() > env.set_runtime_mode(RuntimeExecutionMode.BATCH) > # write all the data to one file > env.set_parallelism(1) > # define the source > if input_path is not None: > ds = env.from_source( > > source=FileSource.for_record_stream_format(StreamFormat.text_line_format(), > input_path) > .process_static_file_set().build(), > watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(), > source_name="file_source" > ) > else: > print("Executing word_count example with default input data set.") > print("Use --input to specify file input.") > ds = env.from_collection(word_count_data) > def split(line): > yield from line.split() > # compute word count > ds = ds.flat_map(split) \ > .map(lambda i: (i, 1), output_type=Types.TUPLE([Types.STRING(), > Types.INT()])) \ > .key_by(lambda i: i[0]) \ > .reduce(lambda i, j: (i[0], i[1] + j[1])) > # define the sink > if output_path is not None: > ds.sink_to( > sink=FileSink.for_row_format( > base_path=output_path, > encoder=Encoder.simple_string_encoder()) > .with_output_file_config( > OutputFileConfig.builder() > .with_part_prefix("prefix") > .with_part_suffix(".ext") > .build()) > .with_rolling_policy(RollingPolicy.default_rolling_policy()) > .build() > ) > else: > print("Printing result to stdout. Use --output to specify output > path.") > ds.print() > # submit for execution > env.execute() > if __name__ == '__main__': > logging.basicConfig(stream=sys.stdout, level=logging.INFO, > format="%(message)s") > parser = argparse.ArgumentParser() > parser.add_argument( > '--input', > dest='input', > required=False, > help='Input file to process.') > parser.add_argument( > '--output', > dest='output', > required=False, > help='Output file to write results to.') > argv = sys.argv[1:] > known_args, _ = parser.parse_known_args(argv) > word_count(known_args.input, known_args.output) > {code} > It will throw the following exception: > {code} > Traceback (most recent call last): > File "pyflink/examples/datastream/word_count.py", line 134, in <module> > word_count(known_args.input, known_args.output) > File "pyflink/examples/datastream/word_count.py", line 113, in word_count > env.execute() > File > "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/datastream/stream_execution_environment.py", > line 691, in execute > return > JobExecutionResult(self._j_stream_execution_environment.execute(j_stream_graph)) > File > "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/py4j/java_gateway.py", > line 1286, in __call__ > answer, self.gateway_client, self.target_id, self.name) > File > "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/util/exceptions.py", > line 146, in deco > return f(*a, **kw) > File > "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/py4j/protocol.py", > line 328, in get_return_value > format(target_id, ".", name), value) > py4j.protocol.Py4JJavaError: An error occurred while calling o3.execute. > : org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) > at > org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137) > at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) > at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) > at > java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) > at > org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:250) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) > at > java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) > at > org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1389) > at > org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93) > at > org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) > at > org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) > at > java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) > at > org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47) > at akka.dispatch.OnComplete.internal(Future.scala:300) > at akka.dispatch.OnComplete.internal(Future.scala:297) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) > at > org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65) > at > scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68) > at > scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284) > at > scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284) > at > scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) > at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621) > at > akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24) > at > akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23) > at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532) > at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29) > at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) > at > akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63) > at > akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100) > at > scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) > at > scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81) > at > akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48) > at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) > at > java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067) > at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703) > at > java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172) > Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by > NoRestartBackoffTimeStrategy > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:228) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:218) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:209) > at > org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:679) > at > org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79) > at > org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:441) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316) > at > org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) > at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) > at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) > at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) > at akka.actor.Actor.aroundReceive(Actor.scala:537) > at akka.actor.Actor.aroundReceive$(Actor.scala:535) > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) > at akka.actor.ActorCell.invoke(ActorCell.scala:548) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) > at akka.dispatch.Mailbox.run(Mailbox.scala:231) > at akka.dispatch.Mailbox.exec(Mailbox.scala:243) > ... 4 more > Caused by: java.lang.RuntimeException: Error while waiting for > BeamPythonFunctionRunner flush > at > org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:361) > at > org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.processElementsOfCurrentKeyIfNeeded(AbstractPythonFunctionOperator.java:257) > at > org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.setCurrentKey(AbstractPythonFunctionOperator.java:246) > at > org.apache.flink.streaming.api.operators.python.PythonKeyedProcessOperator.setCurrentKey(PythonKeyedProcessOperator.java:225) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement(AbstractStreamOperator.java:504) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement1(AbstractStreamOperator.java:491) > at > org.apache.flink.streaming.api.operators.OneInputStreamOperator.setKeyContextElement(OneInputStreamOperator.java:36) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:229) > at > org.apache.flink.streaming.api.operators.sort.SortingDataInput.emitNextSortedRecord(SortingDataInput.java:207) > at > org.apache.flink.streaming.api.operators.sort.SortingDataInput.emitNext(SortingDataInput.java:187) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:489) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:819) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:746) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:785) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:728) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:786) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:572) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.RuntimeException: Failed to close remote bundle > at > org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:377) > at > org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:361) > at > org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$invokeFinishBundle$2(AbstractPythonFunctionOperator.java:340) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > ... 1 more > Caused by: java.util.concurrent.ExecutionException: > org.apache.beam.sdk.coders.CoderException: java.io.EOFException > at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > at > org.apache.beam.sdk.fn.data.CompletableFutureInboundDataClient.awaitCompletion(CompletableFutureInboundDataClient.java:48) > at > org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.awaitCompletion(BeamFnDataInboundObserver.java:73) > at > org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:542) > at > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(DefaultJobBundleFactory.java:555) > at > org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:375) > ... 7 more > Caused by: org.apache.beam.sdk.coders.CoderException: java.io.EOFException > at > org.apache.beam.sdk.coders.StringUtf8Coder.decode(StringUtf8Coder.java:104) > at > org.apache.beam.sdk.coders.StringUtf8Coder.decode(StringUtf8Coder.java:90) > at > org.apache.beam.runners.core.construction.Timer$Coder.decode(Timer.java:195) > at > org.apache.beam.runners.core.construction.Timer$Coder.decode(Timer.java:158) > at > org.apache.beam.sdk.fn.data.DecodingFnDataReceiver.accept(DecodingFnDataReceiver.java:43) > at > org.apache.beam.sdk.fn.data.DecodingFnDataReceiver.accept(DecodingFnDataReceiver.java:25) > at > org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:65) > at > org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:29) > at > org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:178) > at > org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:127) > at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251) > at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33) > at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76) > at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309) > at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292) > at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782) > at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) > at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) > ... 3 more > Caused by: java.io.EOFException > at org.apache.beam.sdk.util.VarInt.decodeLong(VarInt.java:73) > at org.apache.beam.sdk.util.VarInt.decodeInt(VarInt.java:56) > at > org.apache.beam.sdk.coders.StringUtf8Coder.readString(StringUtf8Coder.java:55) > at > org.apache.beam.sdk.coders.StringUtf8Coder.decode(StringUtf8Coder.java:100) > ... 20 more > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)