[
https://issues.apache.org/jira/browse/BEAM-12365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17356620#comment-17356620
]
Per Böckman commented on BEAM-12365:
------------------------------------
Our pipelines running on dataflow in streaming mode started to spam error logs
after upgrading from beam 2.27.0 to 2.29.0.
*~*~*~ Channel ManagedChannelImpl\{logId=677,
target=bigquerystorage.googleapis.com:443} was not shutdown properly!!! ~*~*~*
Make sure to call shutdown()/shutdownNow() and wait until awaitTermination()
returns true.
Stacktrace
java.lang.RuntimeException: ManagedChannel allocation site
at
io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference.<init>(ManagedChannelOrphanWrapper.java:93)
at
io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:53)
at
io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:44)
at
io.grpc.internal.ManagedChannelImplBuilder.build(ManagedChannelImplBuilder.java:612)
at
io.grpc.internal.AbstractManagedChannelImplBuilder.build(AbstractManagedChannelImplBuilder.java:261)
at
com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createSingleChannel(InstantiatingGrpcChannelProvider.java:340)
at
com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.access$1600(InstantiatingGrpcChannelProvider.java:73)
at
com.google.api.gax.grpc.InstantiatingGrpcChannelProvider$1.createSingleChannel(InstantiatingGrpcChannelProvider.java:214)
at com.google.api.gax.grpc.ChannelPool.create(ChannelPool.java:72)
at
com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createChannel(InstantiatingGrpcChannelProvider.java:221)
at
com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.getTransportChannel(InstantiatingGrpcChannelProvider.java:204)
at com.google.api.gax.rpc.ClientContext.create(ClientContext.java:169)
at
com.google.cloud.bigquery.storage.v1beta2.stub.GrpcBigQueryWriteStub.create(GrpcBigQueryWriteStub.java:136)
at
com.google.cloud.bigquery.storage.v1beta2.stub.BigQueryWriteStubSettings.createStub(BigQueryWriteStubSettings.java:145)
at
com.google.cloud.bigquery.storage.v1beta2.BigQueryWriteClient.<init>(BigQueryWriteClient.java:120)
at
com.google.cloud.bigquery.storage.v1beta2.BigQueryWriteClient.create(BigQueryWriteClient.java:101)
at
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.newBigQueryWriteClient(BigQueryServicesImpl.java:1255)
at
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.access$800(BigQueryServicesImpl.java:135)
at
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.<init>(BigQueryServicesImpl.java:521)
at
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.<init>(BigQueryServicesImpl.java:449)
at
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.getDatasetService(BigQueryServicesImpl.java:169)
at
org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite.flushRows(BatchedStreamingWrite.java:374)
at
org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite.access$800(BatchedStreamingWrite.java:69)
at
org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite$BatchAndInsertElements.finishBundle(BatchedStreamingWrite.java:271)
at
org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite$BatchAndInsertElements$DoFnInvoker.invokeFinishBundle(Unknown
Source)
at
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.finishBundle(SimpleDoFnRunner.java:242)
at
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.finishBundle(SimpleParDoFn.java:432)
at
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:56)
at
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:103)
at
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1430)
at
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:165)
at
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1109)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
"
The code used to set up output to the BQ Sink:
WriteResult writeResult = ourEntityPCollection
.apply("WriteEventsToBQ", BigQueryIO.<OurEntity>write()
.to(eventTable)
.withFormatFunction(this::toTableRow)
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
.withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
.withExtendedErrorInfo()
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
writeResult
.getFailedInsertsWithErr()
.apply("LogErrors", MapElements.into(TypeDescriptors.strings())
.via(new LogErrorFn()));
> BigQueryWriteClient is not closed
> ---------------------------------
>
> Key: BEAM-12365
> URL: https://issues.apache.org/jira/browse/BEAM-12365
> Project: Beam
> Issue Type: Bug
> Components: io-java-gcp
> Reporter: Brian Hulette
> Assignee: Reuven Lax
> Priority: P2
>
> See BEAM-12359
> Since the BigQueryWriteClient added in
> https://github.com/apache/beam/pull/14309 is not closed, any BigQueryIO user
> gets a SEVERE error complaining that it's not shutdown:
> {code}
> 2021-05-18 23:39:57.985 CEST [main] INFO
> o.a.b.r.d.DataflowPipelineTranslator - Adding
> ReadBQMyClass/ReadTableRowsFromFieldsThroughStorageAPI_MyClass/Read(BigQueryStorageTableSource)
> as step s12
> 2021-05-18 11:39:57 PM
> io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference
> cleanQueue
> SEVERE: *~*~*~ Channel ManagedChannelImpl{logId=3,
> target=bigquerystorage.googleapis.com:443} was not shutdown properly!!!
> ~*~*~* Make sure to call shutdown()/shutdownNow() and wait until
> awaitTermination() returns true.
> java.lang.RuntimeException: ManagedChannel allocation site at
> io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference.<init>(ManagedChannelOrphanWrapper.java:93)
> at
> io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:53)
> at
> io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:44)
> at
> io.grpc.internal.ManagedChannelImplBuilder.build(ManagedChannelImplBuilder.java:612)
> at
> io.grpc.internal.AbstractManagedChannelImplBuilder.build(AbstractManagedChannelImplBuilder.java:261)
> at
> com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createSingleChannel(InstantiatingGrpcChannelProvider.java:340)
> at
> com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.access$1600(InstantiatingGrpcChannelProvider.java:73)
> at
> com.google.api.gax.grpc.InstantiatingGrpcChannelProvider$1.createSingleChannel(InstantiatingGrpcChannelProvider.java:214)
> at com.google.api.gax.grpc.ChannelPool.create(ChannelPool.java:72) at
> com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createChannel(InstantiatingGrpcChannelProvider.java:221)
> at
> com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.getTransportChannel(InstantiatingGrpcChannelProvider.java:204)
> at com.google.api.gax.rpc.ClientContext.create(ClientContext.java:169) at
> com.google.cloud.bigquery.storage.v1beta2.stub.GrpcBigQueryWriteStub.create(GrpcBigQueryWriteStub.java:136)
> at
> com.google.cloud.bigquery.storage.v1beta2.stub.BigQueryWriteStubSettings.createStub(BigQueryWriteStubSettings.java:145)
> at
> com.google.cloud.bigquery.storage.v1beta2.BigQueryWriteClient.<init>(BigQueryWriteClient.java:120)
> at
> com.google.cloud.bigquery.storage.v1beta2.BigQueryWriteClient.create(BigQueryWriteClient.java:101)
> at
> org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.newBigQueryWriteClient(BigQueryServicesImpl.java:1255)
> at
> org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.access$800(BigQueryServicesImpl.java:135)
> at
> org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.<init>(BigQueryServicesImpl.java:521)
> at
> org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.<init>(BigQueryServicesImpl.java:449)
> at
> org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.getDatasetService(BigQueryServicesImpl.java:169)
> at
> org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$TypedRead.validate(BigQueryIO.java:965)
> at
> org.apache.beam.sdk.Pipeline$ValidateVisitor.enterCompositeTransform(Pipeline.java:661)
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:575)
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:579)
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:579)
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500(TransformHierarchy.java:239)
> at
> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:213)
> at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:468) at
> org.apache.beam.sdk.Pipeline.validate(Pipeline.java:597) at
> org.apache.beam.sdk.Pipeline.run(Pipeline.java:321) at
> org.apache.beam.sdk.Pipeline.run(Pipeline.java:308)
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)