damccorm opened a new issue, #21509: URL: https://github.com/apache/beam/issues/21509
As per [https://the-asf.slack.com/archives/C9H0YNP3P/p1648752819277569?thread_ts=1648749667.880749&cid=C9H0YNP3P](https://the-asf.slack.com/archives/C9H0YNP3P/p1648752819277569?thread_ts=1648749667.880749&cid=C9H0YNP3P) [~ibzib], tagging [~dpcollins-google] When using PubsubLiteIO.read, the GCP admin client makes excessive calls to google.cloud.pubsublite.v1.AdminService.GetTopicPartitions. In a pipeline that ran for 1 minute and 55 seconds this call is made over 1000 times which results in the quota for this API being reached – resulting in job failure. I looked into the history of the module, and I noticed that partition settings were exposed before 2.34.0, but have been removed. [https://github.com/apache/beam/commit/8a646aaa95e79a3f33dff204a659c8a221069ffe#diff-9828d5eb9f2fc844e12c0ebc87b3ffe12d7c4db5d9284a34b979598cf8fc6313R129](https://github.com/apache/beam/commit/8a646aaa95e79a3f33dff204a659c8a221069ffe#diff-9828d5eb9f2fc844e12c0ebc87b3ffe12d7c4db5d9284a34b979598cf8fc6313R129) !image-2022-03-31-15-11-09-570.png!!image-2022-03-31-15-20-15-033.png! To replicate: ``` Pipeline p = Pipeline.create(options); SubscriberOptions opts = SubscriberOptions.newBuilder() .setSubscriptionPath(SubscriptionPath.parse("projects/<project-id\>/locations/us-west1-a/subscriptions/<my-subscription\>")) .build(); p.apply(PubsubLiteIO.read(opts)); ``` Logs: ``` 2022-03-29 15:31:02 org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:216) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:206) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:197) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:682) at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435) at sun.reflect.GeneratedMethodAccessor80.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.util.concurrent.ExecutionException: org.apache.beam.sdk.util.UserCodeException: com.google.api.gax.rpc.ResourceExhaustedException: io.grpc.StatusRuntimeException: RESOURCE_EXHAUSTED: Quota exceeded for quota metric 'Read-only administrator operations' and limit 'Read-only administrator operations per minute per region' of service 'pubsublite.googleapis.com' for consumer 'project_number:<THE PROJECT NUMBER REDACTED\>'. at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:168) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:131) at org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:453) at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:694) at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:645) at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at java.lang.Thread.run(Thread.java:750) Caused by: org.apache.beam.sdk.util.UserCodeException: com.google.api.gax.rpc.ResourceExhaustedException: io.grpc.StatusRuntimeException: RESOURCE_EXHAUSTED: Quota exceeded for quota metric 'Read-only administrator operations' and limit 'Read-only administrator operations per minute per region' of service 'pubsublite.googleapis.com' for consumer 'project_number:<THE PROJECT NUMBER REDACTED\>'. at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39) at org.apache.beam.sdk.transforms.Watch$WatchGrowthFn$DoFnInvoker.invokeProcessElement(Unknown Source) at org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker.invokeProcessElement(OutputAndTimeBoundedSplittableProcessElementInvoker.java:123) at org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems$ProcessFn.processElement(SplittableParDoViaKeyedWorkItems.java:523) at org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems$ProcessFn$DoFnInvoker.invokeProcessElement(Unknown Source) at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:232) at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:188) at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62) at org.apache.beam.runners.flink.translation.wrappers.streaming.SplittableDoFnOperator.fireTimer(SplittableDoFnOperator.java:174) at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.fireTimerInternal(DoFnOperator.java:1001) at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$FlinkTimerInternals.processPendingProcessingTimeTimers(DoFnOperator.java:1356) at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.close(DoFnOperator.java:587) at org.apache.beam.runners.flink.translation.wrappers.streaming.SplittableDoFnOperator.close(SplittableDoFnOperator.java:182) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$closeOperator$5(StreamOperatorWrapper.java:213) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.closeOperator(StreamOperatorWrapper.java:210) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferCloseOperatorToMailbox$3(StreamOperatorWrapper.java:185) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:97) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:162) ... 9 more Caused by: com.google.api.gax.rpc.ResourceExhaustedException: io.grpc.StatusRuntimeException: RESOURCE_EXHAUSTED: Quota exceeded for quota metric 'Read-only administrator operations' and limit 'Read-only administrator operations per minute per region' of service 'pubsublite.googleapis.com' for consumer 'project_number:<THE PROJECT NUMBER REDACTED\>'. at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:57) at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:72) at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:60) at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97) at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:68) at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1074) at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30) at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1213) at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:983) at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:771) at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:563) at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:533) at io.grpc.internal.DelayedClientCall$DelayedListener$3.run(DelayedClientCall.java:463) at io.grpc.internal.DelayedClientCall$DelayedListener.delayOrExecute(DelayedClientCall.java:427) at io.grpc.internal.DelayedClientCall$DelayedListener.onClose(DelayedClientCall.java:460) at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:553) at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:68) at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:739) at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:718) at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) Caused by: io.grpc.StatusRuntimeException: RESOURCE_EXHAUSTED: Quota exceeded for quota metric 'Read-only administrator operations' and limit 'Read-only administrator operations per minute per region' of service 'pubsublite.googleapis.com' for consumer 'project_number:<THE PROJECT NUMBER REDACTED\>'. at io.grpc.Status.asRuntimeException(Status.java:535) at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:533) at io.grpc.internal.DelayedClientCall$DelayedListener$3.run(DelayedClientCall.java:463) at io.grpc.internal.DelayedClientCall$DelayedListener.delayOrExecute(DelayedClientCall.java:427) at io.grpc.internal.DelayedClientCall$DelayedListener.onClose(DelayedClientCall.java:460) at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:553) at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:68) at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:739) at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:718) at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) ``` Flink version: 1.13.5 Beam version: 2.34.0 Imported from Jira [BEAM-14223](https://issues.apache.org/jira/browse/BEAM-14223). Original Jira may contain additional context. Reported by: daniel.lindeman. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
