[
https://issues.apache.org/jira/browse/FLINK-27395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Natan HP updated FLINK-27395:
-----------------------------
Description:
I got this exception on flink taskmanager, but I can see that the data is
successfully published in the pub sub. Here is the log:
{noformat}
2022-04-25 07:53:44,293 INFO org.apache.flink.runtime.taskmanager.Task
[] - Sink: Pub-sub-sink (1/2)#0 (cffc263c60c6c0c8c56092a8386601eb) switched
from INITIALIZING to RUNNING.
Apr 25, 2022 7:53:45 AM io.grpc.internal.ManagedChannelImpl$1 uncaughtException
SEVERE: [Channel<1>: (pubsub.googleapis.com:443)] Uncaught exception in the
SynchronizationContext. Panic!
java.lang.IllegalStateException: Could not find policy 'pick_first'. Make sure
its implementation is either registered to LoadBalancerRegistry or included in
META-INF/services/io.grpc.LoadBalancerProvider from your jar files.
at
io.grpc.internal.AutoConfiguredLoadBalancerFactory$AutoConfiguredLoadBalancer.<init>(AutoConfiguredLoadBalancerFactory.java:94)
at
io.grpc.internal.AutoConfiguredLoadBalancerFactory.newLoadBalancer(AutoConfiguredLoadBalancerFactory.java:65)
at
io.grpc.internal.ManagedChannelImpl.exitIdleMode(ManagedChannelImpl.java:375)
at
io.grpc.internal.ManagedChannelImpl$ChannelTransportProvider$1ExitIdleModeForTransport.run(ManagedChannelImpl.java:469)
at io.grpc.SynchronizationContext.drain(SynchronizationContext.java:95)
at io.grpc.SynchronizationContext.execute(SynchronizationContext.java:127)
at
io.grpc.internal.ManagedChannelImpl$ChannelTransportProvider.get(ManagedChannelImpl.java:473)
at io.grpc.internal.ClientCallImpl.startInternal(ClientCallImpl.java:253)
at io.grpc.internal.ClientCallImpl.start(ClientCallImpl.java:210)
at io.grpc.ForwardingClientCall.start(ForwardingClientCall.java:32)
at
com.google.api.gax.grpc.GrpcHeaderInterceptor$1.start(GrpcHeaderInterceptor.java:94)
at io.grpc.stub.ClientCalls.startCall(ClientCalls.java:314)
at io.grpc.stub.ClientCalls.asyncUnaryRequestCall(ClientCalls.java:288)
at io.grpc.stub.ClientCalls.futureUnaryCall(ClientCalls.java:200)
at
com.google.api.gax.grpc.GrpcDirectCallable.futureCall(GrpcDirectCallable.java:58)
at
com.google.api.gax.grpc.GrpcUnaryRequestParamCallable.futureCall(GrpcUnaryRequestParamCallable.java:65)
at
com.google.api.gax.grpc.GrpcExceptionCallable.futureCall(GrpcExceptionCallable.java:64)
at com.google.api.gax.rpc.AttemptCallable.call(AttemptCallable.java:86)
at
com.google.api.gax.rpc.RetryingCallable.futureCall(RetryingCallable.java:63)
at
com.google.api.gax.rpc.RetryingCallable.futureCall(RetryingCallable.java:41)
at
com.google.api.gax.tracing.TracedBatchingCallable.futureCall(TracedBatchingCallable.java:82)
at
com.google.api.gax.rpc.BatchingCallable.futureCall(BatchingCallable.java:79)
at com.google.api.gax.rpc.UnaryCallable$1.futureCall(UnaryCallable.java:126)
at com.google.api.gax.rpc.UnaryCallable.futureCall(UnaryCallable.java:87)
at com.google.cloud.pubsub.v1.Publisher.publishCall(Publisher.java:425)
at
com.google.cloud.pubsub.v1.Publisher.publishOutstandingBatch(Publisher.java:471)
at
com.google.cloud.pubsub.v1.Publisher.publishAllWithoutInflight(Publisher.java:399)
at com.google.cloud.pubsub.v1.Publisher.access$1100(Publisher.java:88)
at com.google.cloud.pubsub.v1.Publisher$2.run(Publisher.java:326)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
{noformat}
The code sample:
{code:java}
SinkFunction<String> pubsubSink = PubSubSink.newBuilder()
.withSerializationSchema((SerializationSchema<String>) s ->
s.getBytes(StandardCharsets.UTF_8))
.withProjectName("<project-name>")
.withTopicName("<topic-name>")
.build();
dataStream.addSink(pubsubSink)
.name("Pub-sub-sink"); {code}
I use Maven Assembly Plugin to create the uber JAR:
{noformat}
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.6</version>
<configuration>
<archive>
<manifest>
<mainClass>org.example.flink.Main</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>{noformat}
The content of the JAR:
{noformat}
➜ jar tf MyApp.jar | grep io.grpc.LoadBalancerProvider
io/grpc/LoadBalancerProvider$UnknownConfig.class
META-INF/services/io.grpc.LoadBalancerProvider
io/grpc/LoadBalancerProvider.class
➜ jar tf MyApp.jar | grep io.grpc.NameResolverProvider
io/grpc/NameResolverProvider.class
META-INF/services/io.grpc.NameResolverProvider
{noformat}
What I've tried to solve this:
# Downgrading version to 1.14.2
{noformat}
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-gcp-pubsub_2.12</artifactId>
<version>1.14.2</version>
</dependency>{noformat}
# ing maven shade plugin (along side maven assembly plugin) with the following
config as suggedted in [here|#issuecomment-474739796]:]
{noformat}
<transformer
implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
<resource>META-INF/services</resource>
<file>io.grpc.LoadBalancerProvider</file>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
<resource>META-INF/services</resource>
<file>io.grpc.NameResolverProvider</file>
</transformer>{noformat}
# Creating files inside META-INF/services as suggested in
[here|https://github.com/googleapis/google-cloud-java/issues/4700#issuecomment-477658832]:
{noformat}
Data-Feeder-Simulation/FlinkSample/src/main/resources/META-INF/services on
master [⇡!+?]
➜ ls
io.grpc.LoadBalancerProvider
io.grpc.NameResolverProviderData-Feeder-Simulation/FlinkSample/src/main/resources/META-INF/services
on master [⇡!+?]
➜ cat io.grpc.LoadBalancerProvider
io.grpc.internal.PickFirstLoadBalancerProvider
Data-Feeder-Simulation/FlinkSample/src/main/resources/META-INF/services on
master [⇡!+?]
➜ cat io.grpc.NameResolverProvider
io.grpc.internal.DnsNameResolverProvider
{noformat}
# Upgrading to 1.15.0
{noformat}
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-gcp-pubsub</artifactId>
<version>1.15.0</version>
</dependency>{noformat}
[UPDATE 09 June 2022]:
I confirm that this exception causing checkpoint to fail:
{noformat}
java.io.IOException: Could not perform checkpoint 1 for operator Sink:
Pub-sub-sink (2/2)#2.
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1210)
at
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:493)
at
org.apache.flink.streaming.runtime.io.checkpointing.AlternatingWaitingForFirstBarrierUnaligned.barrierReceived(AlternatingWaitingForFirstBarrierUnaligned.java:78)
at
org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlternatingAlignedBarrierHandlerState.barrierReceived(AbstractAlternatingAlignedBarrierHandlerState.java:55)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
at
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
at
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not
complete snapshot 1 for operator Sink: Pub-sub-sink (2/2)#2. Failure reason:
Checkpoint was declined.
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:269)
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:173)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:348)
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:227)
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:212)
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192)
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:647)
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:320)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$12(StreamTask.java:1253)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1241)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1198)
... 22 more
Caused by: java.lang.RuntimeException: Failed trying to publish message
at
org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSink$FailureHandler.onFailure(PubSubSink.java:342)
at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:68)
at
com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1050)
at
org.apache.flink.util.concurrent.DirectExecutorService.execute(DirectExecutorService.java:217)
at
com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1176)
at
com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:969)
at
com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:760)
at
com.google.api.core.AbstractApiFuture$InternalSettableFuture.setException(AbstractApiFuture.java:95)
at
com.google.api.core.AbstractApiFuture.setException(AbstractApiFuture.java:77)
at
com.google.api.core.SettableApiFuture.setException(SettableApiFuture.java:52)
at
com.google.cloud.pubsub.v1.Publisher$OutstandingBatch.onFailure(Publisher.java:519)
at
com.google.cloud.pubsub.v1.Publisher$OutstandingBatch.access$1500(Publisher.java:486)
at com.google.cloud.pubsub.v1.Publisher$3.onFailure(Publisher.java:462)
at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:68)
at
com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1050)
at
com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30)
at
com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1176)
at
com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:969)
at
com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:760)
at
com.google.api.gax.retrying.BasicRetryingFuture.handleAttempt(BasicRetryingFuture.java:179)
at
com.google.api.gax.retrying.CallbackChainRetryingFuture$AttemptCompletionListener.handle(CallbackChainRetryingFuture.java:135)
at
com.google.api.gax.retrying.CallbackChainRetryingFuture$AttemptCompletionListener.run(CallbackChainRetryingFuture.java:117)
at
com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30)
at
com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1176)
at
com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:969)
at
com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:760)
at
com.google.api.core.AbstractApiFuture$InternalSettableFuture.setException(AbstractApiFuture.java:95)
at
com.google.api.core.AbstractApiFuture.setException(AbstractApiFuture.java:77)
at
com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97)
at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:68)
at
com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1050)
at
com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30)
at
com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1176)
at
com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:969)
at
com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:760)
at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:545)
at
io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:515)
at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:426)
at io.grpc.internal.ClientCallImpl.access$500(ClientCallImpl.java:66)
at
io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:689)
at
io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$900(ClientCallImpl.java:577)
at
io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:751)
at
io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:740)
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown
Source)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
Source)
... 1 more
Caused by: com.google.api.gax.rpc.InternalException:
io.grpc.StatusRuntimeException: INTERNAL: Panic! This is a bug!
at
com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:67)
at
com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:72)
at
com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:60)
... 23 more
Caused by: io.grpc.StatusRuntimeException: INTERNAL: Panic! This is a bug!
at io.grpc.Status.asRuntimeException(Status.java:533)
... 15 more
Caused by: java.lang.IllegalStateException: Could not find policy 'pick_first'.
Make sure its implementation is either registered to LoadBalancerRegistry or
included in META-INF/services/io.grpc.LoadBalancerProvider from your jar files.
at
io.grpc.internal.AutoConfiguredLoadBalancerFactory$AutoConfiguredLoadBalancer.<init>(AutoConfiguredLoadBalancerFactory.java:94)
at
io.grpc.internal.AutoConfiguredLoadBalancerFactory.newLoadBalancer(AutoConfiguredLoadBalancerFactory.java:65)
at
io.grpc.internal.ManagedChannelImpl.exitIdleMode(ManagedChannelImpl.java:375)
at
io.grpc.internal.ManagedChannelImpl$ChannelTransportProvider$1ExitIdleModeForTransport.run(ManagedChannelImpl.java:469)
at io.grpc.SynchronizationContext.drain(SynchronizationContext.java:95)
at io.grpc.SynchronizationContext.execute(SynchronizationContext.java:127)
at
io.grpc.internal.ManagedChannelImpl$ChannelTransportProvider.get(ManagedChannelImpl.java:473)
at io.grpc.internal.ClientCallImpl.startInternal(ClientCallImpl.java:253)
at io.grpc.internal.ClientCallImpl.start(ClientCallImpl.java:210)
at io.grpc.ForwardingClientCall.start(ForwardingClientCall.java:32)
at
com.google.api.gax.grpc.GrpcHeaderInterceptor$1.start(GrpcHeaderInterceptor.java:94)
at io.grpc.stub.ClientCalls.startCall(ClientCalls.java:314)
at io.grpc.stub.ClientCalls.asyncUnaryRequestCall(ClientCalls.java:288)
at io.grpc.stub.ClientCalls.futureUnaryCall(ClientCalls.java:200)
at
com.google.api.gax.grpc.GrpcDirectCallable.futureCall(GrpcDirectCallable.java:58)
at
com.google.api.gax.grpc.GrpcUnaryRequestParamCallable.futureCall(GrpcUnaryRequestParamCallable.java:65)
at
com.google.api.gax.grpc.GrpcExceptionCallable.futureCall(GrpcExceptionCallable.java:64)
at com.google.api.gax.rpc.AttemptCallable.call(AttemptCallable.java:86)
at
com.google.api.gax.rpc.RetryingCallable.futureCall(RetryingCallable.java:63)
at
com.google.api.gax.rpc.RetryingCallable.futureCall(RetryingCallable.java:41)
at
com.google.api.gax.tracing.TracedBatchingCallable.futureCall(TracedBatchingCallable.java:82)
at
com.google.api.gax.rpc.BatchingCallable.futureCall(BatchingCallable.java:79)
at com.google.api.gax.rpc.UnaryCallable$1.futureCall(UnaryCallable.java:126)
at com.google.api.gax.rpc.UnaryCallable.futureCall(UnaryCallable.java:87)
at com.google.cloud.pubsub.v1.Publisher.publishCall(Publisher.java:425)
at
com.google.cloud.pubsub.v1.Publisher.publishOutstandingBatch(Publisher.java:471)
at com.google.cloud.pubsub.v1.Publisher.access$800(Publisher.java:88)
at com.google.cloud.pubsub.v1.Publisher$1.run(Publisher.java:292)
... 6 more{noformat}
was:
I got this exception on flink taskmanager, but I can see that the data is
successfully published in the pub sub. Here is the log:
{noformat}
2022-04-25 07:53:44,293 INFO org.apache.flink.runtime.taskmanager.Task
[] - Sink: Pub-sub-sink (1/2)#0 (cffc263c60c6c0c8c56092a8386601eb) switched
from INITIALIZING to RUNNING.
Apr 25, 2022 7:53:45 AM io.grpc.internal.ManagedChannelImpl$1 uncaughtException
SEVERE: [Channel<1>: (pubsub.googleapis.com:443)] Uncaught exception in the
SynchronizationContext. Panic!
java.lang.IllegalStateException: Could not find policy 'pick_first'. Make sure
its implementation is either registered to LoadBalancerRegistry or included in
META-INF/services/io.grpc.LoadBalancerProvider from your jar files.
at
io.grpc.internal.AutoConfiguredLoadBalancerFactory$AutoConfiguredLoadBalancer.<init>(AutoConfiguredLoadBalancerFactory.java:94)
at
io.grpc.internal.AutoConfiguredLoadBalancerFactory.newLoadBalancer(AutoConfiguredLoadBalancerFactory.java:65)
at
io.grpc.internal.ManagedChannelImpl.exitIdleMode(ManagedChannelImpl.java:375)
at
io.grpc.internal.ManagedChannelImpl$ChannelTransportProvider$1ExitIdleModeForTransport.run(ManagedChannelImpl.java:469)
at io.grpc.SynchronizationContext.drain(SynchronizationContext.java:95)
at io.grpc.SynchronizationContext.execute(SynchronizationContext.java:127)
at
io.grpc.internal.ManagedChannelImpl$ChannelTransportProvider.get(ManagedChannelImpl.java:473)
at io.grpc.internal.ClientCallImpl.startInternal(ClientCallImpl.java:253)
at io.grpc.internal.ClientCallImpl.start(ClientCallImpl.java:210)
at io.grpc.ForwardingClientCall.start(ForwardingClientCall.java:32)
at
com.google.api.gax.grpc.GrpcHeaderInterceptor$1.start(GrpcHeaderInterceptor.java:94)
at io.grpc.stub.ClientCalls.startCall(ClientCalls.java:314)
at io.grpc.stub.ClientCalls.asyncUnaryRequestCall(ClientCalls.java:288)
at io.grpc.stub.ClientCalls.futureUnaryCall(ClientCalls.java:200)
at
com.google.api.gax.grpc.GrpcDirectCallable.futureCall(GrpcDirectCallable.java:58)
at
com.google.api.gax.grpc.GrpcUnaryRequestParamCallable.futureCall(GrpcUnaryRequestParamCallable.java:65)
at
com.google.api.gax.grpc.GrpcExceptionCallable.futureCall(GrpcExceptionCallable.java:64)
at com.google.api.gax.rpc.AttemptCallable.call(AttemptCallable.java:86)
at
com.google.api.gax.rpc.RetryingCallable.futureCall(RetryingCallable.java:63)
at
com.google.api.gax.rpc.RetryingCallable.futureCall(RetryingCallable.java:41)
at
com.google.api.gax.tracing.TracedBatchingCallable.futureCall(TracedBatchingCallable.java:82)
at
com.google.api.gax.rpc.BatchingCallable.futureCall(BatchingCallable.java:79)
at com.google.api.gax.rpc.UnaryCallable$1.futureCall(UnaryCallable.java:126)
at com.google.api.gax.rpc.UnaryCallable.futureCall(UnaryCallable.java:87)
at com.google.cloud.pubsub.v1.Publisher.publishCall(Publisher.java:425)
at
com.google.cloud.pubsub.v1.Publisher.publishOutstandingBatch(Publisher.java:471)
at
com.google.cloud.pubsub.v1.Publisher.publishAllWithoutInflight(Publisher.java:399)
at com.google.cloud.pubsub.v1.Publisher.access$1100(Publisher.java:88)
at com.google.cloud.pubsub.v1.Publisher$2.run(Publisher.java:326)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
{noformat}
The code sample:
{code:java}
SinkFunction<String> pubsubSink = PubSubSink.newBuilder()
.withSerializationSchema((SerializationSchema<String>) s ->
s.getBytes(StandardCharsets.UTF_8))
.withProjectName("<project-name>")
.withTopicName("<topic-name>")
.build();
dataStream.addSink(pubsubSink)
.name("Pub-sub-sink"); {code}
I use Maven Assembly Plugin to create the uber JAR:
{noformat}
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.6</version>
<configuration>
<archive>
<manifest>
<mainClass>org.example.flink.Main</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>{noformat}
The content of the JAR:
{noformat}
➜ jar tf MyApp.jar | grep io.grpc.LoadBalancerProvider
io/grpc/LoadBalancerProvider$UnknownConfig.class
META-INF/services/io.grpc.LoadBalancerProvider
io/grpc/LoadBalancerProvider.class
➜ jar tf MyApp.jar | grep io.grpc.NameResolverProvider
io/grpc/NameResolverProvider.class
META-INF/services/io.grpc.NameResolverProvider
{noformat}
What I've tried to solve this:
# Downgrading version to 1.14.2
{noformat}
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-gcp-pubsub_2.12</artifactId>
<version>1.14.2</version>
</dependency>{noformat}
# Using maven shade plugin (along side maven assembly plugin) with the
following config as suggedted in [here|#issuecomment-474739796]:]
{noformat}
<transformer
implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
<resource>META-INF/services</resource>
<file>io.grpc.LoadBalancerProvider</file>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
<resource>META-INF/services</resource>
<file>io.grpc.NameResolverProvider</file>
</transformer>{noformat}
3. Creating files inside META-INF/services as suggested in
[here|https://github.com/googleapis/google-cloud-java/issues/4700#issuecomment-477658832]:
{noformat}
Data-Feeder-Simulation/FlinkSample/src/main/resources/META-INF/services on
master [⇡!+?]
➜ ls
io.grpc.LoadBalancerProvider
io.grpc.NameResolverProviderData-Feeder-Simulation/FlinkSample/src/main/resources/META-INF/services
on master [⇡!+?]
➜ cat io.grpc.LoadBalancerProvider
io.grpc.internal.PickFirstLoadBalancerProvider
Data-Feeder-Simulation/FlinkSample/src/main/resources/META-INF/services on
master [⇡!+?]
➜ cat io.grpc.NameResolverProvider
io.grpc.internal.DnsNameResolverProvider
{noformat}
[UPDATE 09 June 2022]:
I confirm that this exception causing checkpoint to fail:
{noformat}
java.io.IOException: Could not perform checkpoint 1 for operator Sink:
Pub-sub-sink (2/2)#2.
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1210)
at
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:493)
at
org.apache.flink.streaming.runtime.io.checkpointing.AlternatingWaitingForFirstBarrierUnaligned.barrierReceived(AlternatingWaitingForFirstBarrierUnaligned.java:78)
at
org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlternatingAlignedBarrierHandlerState.barrierReceived(AbstractAlternatingAlignedBarrierHandlerState.java:55)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
at
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
at
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not
complete snapshot 1 for operator Sink: Pub-sub-sink (2/2)#2. Failure reason:
Checkpoint was declined.
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:269)
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:173)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:348)
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:227)
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:212)
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192)
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:647)
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:320)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$12(StreamTask.java:1253)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1241)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1198)
... 22 more
Caused by: java.lang.RuntimeException: Failed trying to publish message
at
org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSink$FailureHandler.onFailure(PubSubSink.java:342)
at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:68)
at
com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1050)
at
org.apache.flink.util.concurrent.DirectExecutorService.execute(DirectExecutorService.java:217)
at
com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1176)
at
com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:969)
at
com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:760)
at
com.google.api.core.AbstractApiFuture$InternalSettableFuture.setException(AbstractApiFuture.java:95)
at
com.google.api.core.AbstractApiFuture.setException(AbstractApiFuture.java:77)
at
com.google.api.core.SettableApiFuture.setException(SettableApiFuture.java:52)
at
com.google.cloud.pubsub.v1.Publisher$OutstandingBatch.onFailure(Publisher.java:519)
at
com.google.cloud.pubsub.v1.Publisher$OutstandingBatch.access$1500(Publisher.java:486)
at com.google.cloud.pubsub.v1.Publisher$3.onFailure(Publisher.java:462)
at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:68)
at
com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1050)
at
com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30)
at
com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1176)
at
com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:969)
at
com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:760)
at
com.google.api.gax.retrying.BasicRetryingFuture.handleAttempt(BasicRetryingFuture.java:179)
at
com.google.api.gax.retrying.CallbackChainRetryingFuture$AttemptCompletionListener.handle(CallbackChainRetryingFuture.java:135)
at
com.google.api.gax.retrying.CallbackChainRetryingFuture$AttemptCompletionListener.run(CallbackChainRetryingFuture.java:117)
at
com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30)
at
com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1176)
at
com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:969)
at
com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:760)
at
com.google.api.core.AbstractApiFuture$InternalSettableFuture.setException(AbstractApiFuture.java:95)
at
com.google.api.core.AbstractApiFuture.setException(AbstractApiFuture.java:77)
at
com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97)
at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:68)
at
com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1050)
at
com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30)
at
com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1176)
at
com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:969)
at
com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:760)
at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:545)
at
io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:515)
at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:426)
at io.grpc.internal.ClientCallImpl.access$500(ClientCallImpl.java:66)
at
io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:689)
at
io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$900(ClientCallImpl.java:577)
at
io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:751)
at
io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:740)
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown
Source)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
Source)
... 1 more
Caused by: com.google.api.gax.rpc.InternalException:
io.grpc.StatusRuntimeException: INTERNAL: Panic! This is a bug!
at
com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:67)
at
com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:72)
at
com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:60)
... 23 more
Caused by: io.grpc.StatusRuntimeException: INTERNAL: Panic! This is a bug!
at io.grpc.Status.asRuntimeException(Status.java:533)
... 15 more
Caused by: java.lang.IllegalStateException: Could not find policy 'pick_first'.
Make sure its implementation is either registered to LoadBalancerRegistry or
included in META-INF/services/io.grpc.LoadBalancerProvider from your jar files.
at
io.grpc.internal.AutoConfiguredLoadBalancerFactory$AutoConfiguredLoadBalancer.<init>(AutoConfiguredLoadBalancerFactory.java:94)
at
io.grpc.internal.AutoConfiguredLoadBalancerFactory.newLoadBalancer(AutoConfiguredLoadBalancerFactory.java:65)
at
io.grpc.internal.ManagedChannelImpl.exitIdleMode(ManagedChannelImpl.java:375)
at
io.grpc.internal.ManagedChannelImpl$ChannelTransportProvider$1ExitIdleModeForTransport.run(ManagedChannelImpl.java:469)
at io.grpc.SynchronizationContext.drain(SynchronizationContext.java:95)
at io.grpc.SynchronizationContext.execute(SynchronizationContext.java:127)
at
io.grpc.internal.ManagedChannelImpl$ChannelTransportProvider.get(ManagedChannelImpl.java:473)
at io.grpc.internal.ClientCallImpl.startInternal(ClientCallImpl.java:253)
at io.grpc.internal.ClientCallImpl.start(ClientCallImpl.java:210)
at io.grpc.ForwardingClientCall.start(ForwardingClientCall.java:32)
at
com.google.api.gax.grpc.GrpcHeaderInterceptor$1.start(GrpcHeaderInterceptor.java:94)
at io.grpc.stub.ClientCalls.startCall(ClientCalls.java:314)
at io.grpc.stub.ClientCalls.asyncUnaryRequestCall(ClientCalls.java:288)
at io.grpc.stub.ClientCalls.futureUnaryCall(ClientCalls.java:200)
at
com.google.api.gax.grpc.GrpcDirectCallable.futureCall(GrpcDirectCallable.java:58)
at
com.google.api.gax.grpc.GrpcUnaryRequestParamCallable.futureCall(GrpcUnaryRequestParamCallable.java:65)
at
com.google.api.gax.grpc.GrpcExceptionCallable.futureCall(GrpcExceptionCallable.java:64)
at com.google.api.gax.rpc.AttemptCallable.call(AttemptCallable.java:86)
at
com.google.api.gax.rpc.RetryingCallable.futureCall(RetryingCallable.java:63)
at
com.google.api.gax.rpc.RetryingCallable.futureCall(RetryingCallable.java:41)
at
com.google.api.gax.tracing.TracedBatchingCallable.futureCall(TracedBatchingCallable.java:82)
at
com.google.api.gax.rpc.BatchingCallable.futureCall(BatchingCallable.java:79)
at com.google.api.gax.rpc.UnaryCallable$1.futureCall(UnaryCallable.java:126)
at com.google.api.gax.rpc.UnaryCallable.futureCall(UnaryCallable.java:87)
at com.google.cloud.pubsub.v1.Publisher.publishCall(Publisher.java:425)
at
com.google.cloud.pubsub.v1.Publisher.publishOutstandingBatch(Publisher.java:471)
at com.google.cloud.pubsub.v1.Publisher.access$800(Publisher.java:88)
at com.google.cloud.pubsub.v1.Publisher$1.run(Publisher.java:292)
... 6 more{noformat}
> IllegalStateException: Could not find policy 'pick_first'. on Flink
> Application
> -------------------------------------------------------------------------------
>
> Key: FLINK-27395
> URL: https://issues.apache.org/jira/browse/FLINK-27395
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Google Cloud PubSub
> Affects Versions: 1.15.0, 1.14.2, 1.14.4
> Environment: # Minikube
> {noformat}
> ➜ minikube version
> minikube version: v1.25.2
> commit: 362d5fdc0a3dbee389b3d3f1034e8023e72bd3a7
> {noformat}
> # Apache Flink Docker Images
> {noformat}
> apache/flink:1.14.4-scala_2.11{noformat}
> {noformat}
> apache/flink:1.15.0-scala_2.12-java11{noformat}
> Reporter: Natan HP
> Priority: Major
>
> I got this exception on flink taskmanager, but I can see that the data is
> successfully published in the pub sub. Here is the log:
>
> {noformat}
> 2022-04-25 07:53:44,293 INFO org.apache.flink.runtime.taskmanager.Task
>
> [] - Sink: Pub-sub-sink (1/2)#0 (cffc263c60c6c0c8c56092a8386601eb) switched
> from INITIALIZING to RUNNING.
> Apr 25, 2022 7:53:45 AM io.grpc.internal.ManagedChannelImpl$1
> uncaughtException
> SEVERE: [Channel<1>: (pubsub.googleapis.com:443)] Uncaught exception in the
> SynchronizationContext. Panic!
> java.lang.IllegalStateException: Could not find policy 'pick_first'. Make
> sure its implementation is either registered to LoadBalancerRegistry or
> included in META-INF/services/io.grpc.LoadBalancerProvider from your jar
> files.
> at
> io.grpc.internal.AutoConfiguredLoadBalancerFactory$AutoConfiguredLoadBalancer.<init>(AutoConfiguredLoadBalancerFactory.java:94)
> at
> io.grpc.internal.AutoConfiguredLoadBalancerFactory.newLoadBalancer(AutoConfiguredLoadBalancerFactory.java:65)
> at
> io.grpc.internal.ManagedChannelImpl.exitIdleMode(ManagedChannelImpl.java:375)
> at
> io.grpc.internal.ManagedChannelImpl$ChannelTransportProvider$1ExitIdleModeForTransport.run(ManagedChannelImpl.java:469)
> at io.grpc.SynchronizationContext.drain(SynchronizationContext.java:95)
> at io.grpc.SynchronizationContext.execute(SynchronizationContext.java:127)
> at
> io.grpc.internal.ManagedChannelImpl$ChannelTransportProvider.get(ManagedChannelImpl.java:473)
> at io.grpc.internal.ClientCallImpl.startInternal(ClientCallImpl.java:253)
> at io.grpc.internal.ClientCallImpl.start(ClientCallImpl.java:210)
> at io.grpc.ForwardingClientCall.start(ForwardingClientCall.java:32)
> at
> com.google.api.gax.grpc.GrpcHeaderInterceptor$1.start(GrpcHeaderInterceptor.java:94)
> at io.grpc.stub.ClientCalls.startCall(ClientCalls.java:314)
> at io.grpc.stub.ClientCalls.asyncUnaryRequestCall(ClientCalls.java:288)
> at io.grpc.stub.ClientCalls.futureUnaryCall(ClientCalls.java:200)
> at
> com.google.api.gax.grpc.GrpcDirectCallable.futureCall(GrpcDirectCallable.java:58)
> at
> com.google.api.gax.grpc.GrpcUnaryRequestParamCallable.futureCall(GrpcUnaryRequestParamCallable.java:65)
> at
> com.google.api.gax.grpc.GrpcExceptionCallable.futureCall(GrpcExceptionCallable.java:64)
> at com.google.api.gax.rpc.AttemptCallable.call(AttemptCallable.java:86)
> at
> com.google.api.gax.rpc.RetryingCallable.futureCall(RetryingCallable.java:63)
> at
> com.google.api.gax.rpc.RetryingCallable.futureCall(RetryingCallable.java:41)
> at
> com.google.api.gax.tracing.TracedBatchingCallable.futureCall(TracedBatchingCallable.java:82)
> at
> com.google.api.gax.rpc.BatchingCallable.futureCall(BatchingCallable.java:79)
> at
> com.google.api.gax.rpc.UnaryCallable$1.futureCall(UnaryCallable.java:126)
> at com.google.api.gax.rpc.UnaryCallable.futureCall(UnaryCallable.java:87)
> at com.google.cloud.pubsub.v1.Publisher.publishCall(Publisher.java:425)
> at
> com.google.cloud.pubsub.v1.Publisher.publishOutstandingBatch(Publisher.java:471)
> at
> com.google.cloud.pubsub.v1.Publisher.publishAllWithoutInflight(Publisher.java:399)
> at com.google.cloud.pubsub.v1.Publisher.access$1100(Publisher.java:88)
> at com.google.cloud.pubsub.v1.Publisher$2.run(Publisher.java:326)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:750)
> {noformat}
>
>
> The code sample:
> {code:java}
> SinkFunction<String> pubsubSink = PubSubSink.newBuilder()
> .withSerializationSchema((SerializationSchema<String>) s ->
> s.getBytes(StandardCharsets.UTF_8))
> .withProjectName("<project-name>")
> .withTopicName("<topic-name>")
> .build();
> dataStream.addSink(pubsubSink)
> .name("Pub-sub-sink"); {code}
>
> I use Maven Assembly Plugin to create the uber JAR:
> {noformat}
> <plugin>
> <groupId>org.apache.maven.plugins</groupId>
> <artifactId>maven-assembly-plugin</artifactId>
> <version>2.6</version>
> <configuration>
> <archive>
> <manifest>
> <mainClass>org.example.flink.Main</mainClass>
> </manifest>
> </archive>
> <descriptorRefs>
> <descriptorRef>jar-with-dependencies</descriptorRef>
> </descriptorRefs>
> </configuration>
> <executions>
> <execution>
> <id>make-assembly</id>
> <phase>package</phase>
> <goals>
> <goal>single</goal>
> </goals>
> </execution>
> </executions>
> </plugin>{noformat}
>
> The content of the JAR:
> {noformat}
> ➜ jar tf MyApp.jar | grep io.grpc.LoadBalancerProvider
> io/grpc/LoadBalancerProvider$UnknownConfig.class
> META-INF/services/io.grpc.LoadBalancerProvider
> io/grpc/LoadBalancerProvider.class
> ➜ jar tf MyApp.jar | grep io.grpc.NameResolverProvider
> io/grpc/NameResolverProvider.class
> META-INF/services/io.grpc.NameResolverProvider
> {noformat}
>
> What I've tried to solve this:
> # Downgrading version to 1.14.2
> {noformat}
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-connector-gcp-pubsub_2.12</artifactId>
> <version>1.14.2</version>
> </dependency>{noformat}
> # ing maven shade plugin (along side maven assembly plugin) with the
> following config as suggedted in [here|#issuecomment-474739796]:]
> {noformat}
> <transformer
> implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
> <resource>META-INF/services</resource>
> <file>io.grpc.LoadBalancerProvider</file>
> </transformer>
> <transformer
> implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
> <resource>META-INF/services</resource>
> <file>io.grpc.NameResolverProvider</file>
> </transformer>{noformat}
>
> # Creating files inside META-INF/services as suggested in
> [here|https://github.com/googleapis/google-cloud-java/issues/4700#issuecomment-477658832]:
> {noformat}
> Data-Feeder-Simulation/FlinkSample/src/main/resources/META-INF/services on
> master [⇡!+?]
> ➜ ls
> io.grpc.LoadBalancerProvider
> io.grpc.NameResolverProviderData-Feeder-Simulation/FlinkSample/src/main/resources/META-INF/services
> on master [⇡!+?]
> ➜ cat io.grpc.LoadBalancerProvider
> io.grpc.internal.PickFirstLoadBalancerProvider
> Data-Feeder-Simulation/FlinkSample/src/main/resources/META-INF/services on
> master [⇡!+?]
> ➜ cat io.grpc.NameResolverProvider
> io.grpc.internal.DnsNameResolverProvider
> {noformat}
> # Upgrading to 1.15.0
> {noformat}
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-connector-gcp-pubsub</artifactId>
> <version>1.15.0</version>
> </dependency>{noformat}
> [UPDATE 09 June 2022]:
> I confirm that this exception causing checkpoint to fail:
> {noformat}
> java.io.IOException: Could not perform checkpoint 1 for operator Sink:
> Pub-sub-sink (2/2)#2.
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1210)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:493)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.AlternatingWaitingForFirstBarrierUnaligned.barrierReceived(AlternatingWaitingForFirstBarrierUnaligned.java:78)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlternatingAlignedBarrierHandlerState.barrierReceived(AbstractAlternatingAlignedBarrierHandlerState.java:55)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
> at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> at java.base/java.lang.Thread.run(Unknown Source)
> Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not
> complete snapshot 1 for operator Sink: Pub-sub-sink (2/2)#2. Failure reason:
> Checkpoint was declined.
> at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:269)
> at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:173)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:348)
> at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:227)
> at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:212)
> at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192)
> at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:647)
> at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:320)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$12(StreamTask.java:1253)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1241)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1198)
> ... 22 more
> Caused by: java.lang.RuntimeException: Failed trying to publish message
> at
> org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSink$FailureHandler.onFailure(PubSubSink.java:342)
> at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:68)
> at
> com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1050)
> at
> org.apache.flink.util.concurrent.DirectExecutorService.execute(DirectExecutorService.java:217)
> at
> com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1176)
> at
> com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:969)
> at
> com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:760)
> at
> com.google.api.core.AbstractApiFuture$InternalSettableFuture.setException(AbstractApiFuture.java:95)
> at
> com.google.api.core.AbstractApiFuture.setException(AbstractApiFuture.java:77)
> at
> com.google.api.core.SettableApiFuture.setException(SettableApiFuture.java:52)
> at
> com.google.cloud.pubsub.v1.Publisher$OutstandingBatch.onFailure(Publisher.java:519)
> at
> com.google.cloud.pubsub.v1.Publisher$OutstandingBatch.access$1500(Publisher.java:486)
> at com.google.cloud.pubsub.v1.Publisher$3.onFailure(Publisher.java:462)
> at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:68)
> at
> com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1050)
> at
> com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30)
> at
> com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1176)
> at
> com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:969)
> at
> com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:760)
> at
> com.google.api.gax.retrying.BasicRetryingFuture.handleAttempt(BasicRetryingFuture.java:179)
> at
> com.google.api.gax.retrying.CallbackChainRetryingFuture$AttemptCompletionListener.handle(CallbackChainRetryingFuture.java:135)
> at
> com.google.api.gax.retrying.CallbackChainRetryingFuture$AttemptCompletionListener.run(CallbackChainRetryingFuture.java:117)
> at
> com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30)
> at
> com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1176)
> at
> com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:969)
> at
> com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:760)
> at
> com.google.api.core.AbstractApiFuture$InternalSettableFuture.setException(AbstractApiFuture.java:95)
> at
> com.google.api.core.AbstractApiFuture.setException(AbstractApiFuture.java:77)
> at
> com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97)
> at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:68)
> at
> com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1050)
> at
> com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30)
> at
> com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1176)
> at
> com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:969)
> at
> com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:760)
> at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:545)
> at
> io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:515)
> at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:426)
> at io.grpc.internal.ClientCallImpl.access$500(ClientCallImpl.java:66)
> at
> io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:689)
> at
> io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$900(ClientCallImpl.java:577)
> at
> io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:751)
> at
> io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:740)
> at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
> at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
> at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown
> Source)
> at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
> at
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
> Source)
> at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
> Source)
> at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
> Source)
> ... 1 more
> Caused by: com.google.api.gax.rpc.InternalException:
> io.grpc.StatusRuntimeException: INTERNAL: Panic! This is a bug!
> at
> com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:67)
> at
> com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:72)
> at
> com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:60)
> ... 23 more
> Caused by: io.grpc.StatusRuntimeException: INTERNAL: Panic! This is a bug!
> at io.grpc.Status.asRuntimeException(Status.java:533)
> ... 15 more
> Caused by: java.lang.IllegalStateException: Could not find policy
> 'pick_first'. Make sure its implementation is either registered to
> LoadBalancerRegistry or included in
> META-INF/services/io.grpc.LoadBalancerProvider from your jar files.
> at
> io.grpc.internal.AutoConfiguredLoadBalancerFactory$AutoConfiguredLoadBalancer.<init>(AutoConfiguredLoadBalancerFactory.java:94)
> at
> io.grpc.internal.AutoConfiguredLoadBalancerFactory.newLoadBalancer(AutoConfiguredLoadBalancerFactory.java:65)
> at
> io.grpc.internal.ManagedChannelImpl.exitIdleMode(ManagedChannelImpl.java:375)
> at
> io.grpc.internal.ManagedChannelImpl$ChannelTransportProvider$1ExitIdleModeForTransport.run(ManagedChannelImpl.java:469)
> at io.grpc.SynchronizationContext.drain(SynchronizationContext.java:95)
> at io.grpc.SynchronizationContext.execute(SynchronizationContext.java:127)
> at
> io.grpc.internal.ManagedChannelImpl$ChannelTransportProvider.get(ManagedChannelImpl.java:473)
> at io.grpc.internal.ClientCallImpl.startInternal(ClientCallImpl.java:253)
> at io.grpc.internal.ClientCallImpl.start(ClientCallImpl.java:210)
> at io.grpc.ForwardingClientCall.start(ForwardingClientCall.java:32)
> at
> com.google.api.gax.grpc.GrpcHeaderInterceptor$1.start(GrpcHeaderInterceptor.java:94)
> at io.grpc.stub.ClientCalls.startCall(ClientCalls.java:314)
> at io.grpc.stub.ClientCalls.asyncUnaryRequestCall(ClientCalls.java:288)
> at io.grpc.stub.ClientCalls.futureUnaryCall(ClientCalls.java:200)
> at
> com.google.api.gax.grpc.GrpcDirectCallable.futureCall(GrpcDirectCallable.java:58)
> at
> com.google.api.gax.grpc.GrpcUnaryRequestParamCallable.futureCall(GrpcUnaryRequestParamCallable.java:65)
> at
> com.google.api.gax.grpc.GrpcExceptionCallable.futureCall(GrpcExceptionCallable.java:64)
> at com.google.api.gax.rpc.AttemptCallable.call(AttemptCallable.java:86)
> at
> com.google.api.gax.rpc.RetryingCallable.futureCall(RetryingCallable.java:63)
> at
> com.google.api.gax.rpc.RetryingCallable.futureCall(RetryingCallable.java:41)
> at
> com.google.api.gax.tracing.TracedBatchingCallable.futureCall(TracedBatchingCallable.java:82)
> at
> com.google.api.gax.rpc.BatchingCallable.futureCall(BatchingCallable.java:79)
> at
> com.google.api.gax.rpc.UnaryCallable$1.futureCall(UnaryCallable.java:126)
> at com.google.api.gax.rpc.UnaryCallable.futureCall(UnaryCallable.java:87)
> at com.google.cloud.pubsub.v1.Publisher.publishCall(Publisher.java:425)
> at
> com.google.cloud.pubsub.v1.Publisher.publishOutstandingBatch(Publisher.java:471)
> at com.google.cloud.pubsub.v1.Publisher.access$800(Publisher.java:88)
> at com.google.cloud.pubsub.v1.Publisher$1.run(Publisher.java:292)
> ... 6 more{noformat}
--
This message was sent by Atlassian Jira
(v8.20.7#820007)