damccorm opened a new issue, #21509:
URL: https://github.com/apache/beam/issues/21509

   As per 
[https://the-asf.slack.com/archives/C9H0YNP3P/p1648752819277569?thread_ts=1648749667.880749&cid=C9H0YNP3P](https://the-asf.slack.com/archives/C9H0YNP3P/p1648752819277569?thread_ts=1648749667.880749&cid=C9H0YNP3P)
 [~ibzib], tagging [~dpcollins-google] 
   
   When using PubsubLiteIO.read, the GCP admin client makes excessive calls to 
google.cloud.pubsublite.v1.AdminService.GetTopicPartitions. In a pipeline that 
ran for 1 minute and 55 seconds this call is made over 1000 times which results 
in the quota for this API being reached – resulting in job failure.
   
   I looked into the history of the module, and I noticed that partition 
settings were exposed before 2.34.0, but have been removed. 
[https://github.com/apache/beam/commit/8a646aaa95e79a3f33dff204a659c8a221069ffe#diff-9828d5eb9f2fc844e12c0ebc87b3ffe12d7c4db5d9284a34b979598cf8fc6313R129](https://github.com/apache/beam/commit/8a646aaa95e79a3f33dff204a659c8a221069ffe#diff-9828d5eb9f2fc844e12c0ebc87b3ffe12d7c4db5d9284a34b979598cf8fc6313R129)
   
   !image-2022-03-31-15-11-09-570.png!!image-2022-03-31-15-20-15-033.png!
   
   To replicate:
   ```
   
   Pipeline p = Pipeline.create(options);
   SubscriberOptions opts = SubscriberOptions.newBuilder()
   
.setSubscriptionPath(SubscriptionPath.parse("projects/<project-id\>/locations/us-west1-a/subscriptions/<my-subscription\>"))
   .build();
   p.apply(PubsubLiteIO.read(opts));
   ```
   
   Logs:
   ```
   2022-03-29 15:31:02
   org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
       at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
       at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
       at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:216)
       at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:206)
       at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:197)
       at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:682)
       at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
       at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)
       at sun.reflect.GeneratedMethodAccessor80.invoke(Unknown Source)
       at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
       at java.lang.reflect.Method.invoke(Method.java:498)
       at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
       at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
       at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
       at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
       at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
       at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
       at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
       at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
       at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
       at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
       at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
       at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
       at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
       at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
       at akka.actor.ActorCell.invoke(ActorCell.scala:561)
       at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
       at akka.dispatch.Mailbox.run(Mailbox.scala:225)
       at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
       at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
       at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
       at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
       at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
   Caused by: java.util.concurrent.ExecutionException: 
org.apache.beam.sdk.util.UserCodeException: 
com.google.api.gax.rpc.ResourceExhaustedException: 
io.grpc.StatusRuntimeException: RESOURCE_EXHAUSTED: Quota exceeded for quota 
metric 'Read-only administrator operations' and limit 'Read-only administrator 
operations per minute per region' of service 'pubsublite.googleapis.com' for 
consumer 'project_number:<THE PROJECT NUMBER REDACTED\>'.
       at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
       at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
       at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:168)
       at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:131)
       at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:453)
       at 
org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:694)
       at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:645)
       at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
       at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
       at java.lang.Thread.run(Thread.java:750)
   Caused by: org.apache.beam.sdk.util.UserCodeException: 
com.google.api.gax.rpc.ResourceExhaustedException: 
io.grpc.StatusRuntimeException: RESOURCE_EXHAUSTED: Quota exceeded for quota 
metric 'Read-only administrator operations' and limit 'Read-only administrator 
operations per minute per region' of service 'pubsublite.googleapis.com' for 
consumer 'project_number:<THE PROJECT NUMBER REDACTED\>'.
       at 
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
       at 
org.apache.beam.sdk.transforms.Watch$WatchGrowthFn$DoFnInvoker.invokeProcessElement(Unknown
 Source)
       at 
org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker.invokeProcessElement(OutputAndTimeBoundedSplittableProcessElementInvoker.java:123)
       at 
org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems$ProcessFn.processElement(SplittableParDoViaKeyedWorkItems.java:523)
       at 
org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems$ProcessFn$DoFnInvoker.invokeProcessElement(Unknown
 Source)
       at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:232)
       at 
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:188)
       at 
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
       at 
org.apache.beam.runners.flink.translation.wrappers.streaming.SplittableDoFnOperator.fireTimer(SplittableDoFnOperator.java:174)
       at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.fireTimerInternal(DoFnOperator.java:1001)
       at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$FlinkTimerInternals.processPendingProcessingTimeTimers(DoFnOperator.java:1356)
       at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.close(DoFnOperator.java:587)
       at 
org.apache.beam.runners.flink.translation.wrappers.streaming.SplittableDoFnOperator.close(SplittableDoFnOperator.java:182)
       at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$closeOperator$5(StreamOperatorWrapper.java:213)
       at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
       at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.closeOperator(StreamOperatorWrapper.java:210)
       at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferCloseOperatorToMailbox$3(StreamOperatorWrapper.java:185)
       at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
       at 
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
       at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:97)
       at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:162)
       ... 9 more
   Caused by: com.google.api.gax.rpc.ResourceExhaustedException: 
io.grpc.StatusRuntimeException: RESOURCE_EXHAUSTED: Quota exceeded for quota 
metric 'Read-only administrator operations' and limit 'Read-only administrator 
operations per minute per region' of service 'pubsublite.googleapis.com' for 
consumer 'project_number:<THE PROJECT NUMBER REDACTED\>'.
       at 
com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:57)
       at 
com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:72)
       at 
com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:60)
       at 
com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97)
       at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:68)
       at 
com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1074)
       at 
com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30)
       at 
com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1213)
       at 
com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:983)
       at 
com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:771)
       at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:563)
       at 
io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:533)
       at 
io.grpc.internal.DelayedClientCall$DelayedListener$3.run(DelayedClientCall.java:463)
       at 
io.grpc.internal.DelayedClientCall$DelayedListener.delayOrExecute(DelayedClientCall.java:427)
       at 
io.grpc.internal.DelayedClientCall$DelayedListener.onClose(DelayedClientCall.java:460)
       at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:553)
       at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:68)
       at 
io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:739)
       at 
io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:718)
       at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
       at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
       at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
       at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
       at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
       at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
       at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   Caused by: io.grpc.StatusRuntimeException: RESOURCE_EXHAUSTED: Quota 
exceeded for quota metric 'Read-only administrator operations' and limit 
'Read-only administrator operations per minute per region' of service 
'pubsublite.googleapis.com' for consumer 'project_number:<THE PROJECT NUMBER 
REDACTED\>'.
       at io.grpc.Status.asRuntimeException(Status.java:535)
       at 
io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:533)
       at 
io.grpc.internal.DelayedClientCall$DelayedListener$3.run(DelayedClientCall.java:463)
       at 
io.grpc.internal.DelayedClientCall$DelayedListener.delayOrExecute(DelayedClientCall.java:427)
       at 
io.grpc.internal.DelayedClientCall$DelayedListener.onClose(DelayedClientCall.java:460)
       at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:553)
       at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:68)
       at 
io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:739)
       at 
io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:718)
       at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
       at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
       at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
       at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
       at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
       at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
       at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
       at java.lang.Thread.run(Thread.java:750)
   
   ```
   
    
   
   Flink version: 1.13.5
   Beam version: 2.34.0
   
   Imported from Jira 
[BEAM-14223](https://issues.apache.org/jira/browse/BEAM-14223). Original Jira 
may contain additional context.
   Reported by: daniel.lindeman.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to