[ 
https://issues.apache.org/jira/browse/BEAM-12365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17356620#comment-17356620
 ] 

Per Böckman edited comment on BEAM-12365 at 6/3/21, 5:52 PM:
-------------------------------------------------------------

Our pipelines running on dataflow in streaming mode started to spam error logs 
after upgrading from beam 2.27.0 to 2.29.0.

 
Error message
{code:java}
Channel ManagedChannelImpl{logId=677, 
target=bigquerystorage.googleapis.com:443} was not shutdown properly!!! ~
 Make sure to call shutdown()/shutdownNow() and wait until awaitTermination() 
returns true.
{code}

  
 Stacktrace
{code:java}
java.lang.RuntimeException: ManagedChannel allocation site
 at 
io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference.<init>(ManagedChannelOrphanWrapper.java:93)
 at 
io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:53)
 at 
io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:44)
 at 
io.grpc.internal.ManagedChannelImplBuilder.build(ManagedChannelImplBuilder.java:612)
 at 
io.grpc.internal.AbstractManagedChannelImplBuilder.build(AbstractManagedChannelImplBuilder.java:261)
 at 
com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createSingleChannel(InstantiatingGrpcChannelProvider.java:340)
 at 
com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.access$1600(InstantiatingGrpcChannelProvider.java:73)
 at 
com.google.api.gax.grpc.InstantiatingGrpcChannelProvider$1.createSingleChannel(InstantiatingGrpcChannelProvider.java:214)
 at com.google.api.gax.grpc.ChannelPool.create(ChannelPool.java:72)
 at 
com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createChannel(InstantiatingGrpcChannelProvider.java:221)
 at 
com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.getTransportChannel(InstantiatingGrpcChannelProvider.java:204)
 at com.google.api.gax.rpc.ClientContext.create(ClientContext.java:169)
 at 
com.google.cloud.bigquery.storage.v1beta2.stub.GrpcBigQueryWriteStub.create(GrpcBigQueryWriteStub.java:136)
 at 
com.google.cloud.bigquery.storage.v1beta2.stub.BigQueryWriteStubSettings.createStub(BigQueryWriteStubSettings.java:145)
 at 
com.google.cloud.bigquery.storage.v1beta2.BigQueryWriteClient.<init>(BigQueryWriteClient.java:120)
 at 
com.google.cloud.bigquery.storage.v1beta2.BigQueryWriteClient.create(BigQueryWriteClient.java:101)
 at 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.newBigQueryWriteClient(BigQueryServicesImpl.java:1255)
 at 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.access$800(BigQueryServicesImpl.java:135)
 at 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.<init>(BigQueryServicesImpl.java:521)
 at 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.<init>(BigQueryServicesImpl.java:449)
 at 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.getDatasetService(BigQueryServicesImpl.java:169)
 at 
org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite.flushRows(BatchedStreamingWrite.java:374)
 at 
org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite.access$800(BatchedStreamingWrite.java:69)
 at 
org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite$BatchAndInsertElements.finishBundle(BatchedStreamingWrite.java:271)
 at 
org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite$BatchAndInsertElements$DoFnInvoker.invokeFinishBundle(Unknown
 Source)
 at 
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.finishBundle(SimpleDoFnRunner.java:242)
 at 
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.finishBundle(SimpleParDoFn.java:432)
 at 
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:56)
 at 
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:103)
 at 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1430)
 at 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:165)
 at 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1109)
 at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
 at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
 at java.base/java.lang.Thread.run(Thread.java:834){code}
Code used to set up output to the BQ sink
{code:java}
WriteResult writeResult = ourEntityPCollection
 .apply("WriteEventsToBQ", BigQueryIO.<OurEntity>write()
 .to(eventTable)
 .withFormatFunction(this::toTableRow)
 .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
 .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
 .withExtendedErrorInfo() 
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER) 
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
writeResult
 .getFailedInsertsWithErr()
 .apply("LogErrors", MapElements.into(TypeDescriptors.strings())
 .via(new LogErrorFn()));{code}
 


was (Author: dev2null):
Our pipelines running on dataflow in streaming mode started to spam error logs 
after upgrading from beam 2.27.0 to 2.29.0.

 
*~*~*~ Channel ManagedChannelImpl\{logId=677, 
target=bigquerystorage.googleapis.com:443} was not shutdown properly!!! ~*~*~*
Make sure to call shutdown()/shutdownNow() and wait until awaitTermination() 
returns true.
 
Stacktrace
java.lang.RuntimeException: ManagedChannel allocation site
       at 
io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference.<init>(ManagedChannelOrphanWrapper.java:93)
       at 
io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:53)
       at 
io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:44)
       at 
io.grpc.internal.ManagedChannelImplBuilder.build(ManagedChannelImplBuilder.java:612)
       at 
io.grpc.internal.AbstractManagedChannelImplBuilder.build(AbstractManagedChannelImplBuilder.java:261)
       at 
com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createSingleChannel(InstantiatingGrpcChannelProvider.java:340)
       at 
com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.access$1600(InstantiatingGrpcChannelProvider.java:73)
       at 
com.google.api.gax.grpc.InstantiatingGrpcChannelProvider$1.createSingleChannel(InstantiatingGrpcChannelProvider.java:214)
       at com.google.api.gax.grpc.ChannelPool.create(ChannelPool.java:72)
       at 
com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createChannel(InstantiatingGrpcChannelProvider.java:221)
       at 
com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.getTransportChannel(InstantiatingGrpcChannelProvider.java:204)
       at com.google.api.gax.rpc.ClientContext.create(ClientContext.java:169)
       at 
com.google.cloud.bigquery.storage.v1beta2.stub.GrpcBigQueryWriteStub.create(GrpcBigQueryWriteStub.java:136)
       at 
com.google.cloud.bigquery.storage.v1beta2.stub.BigQueryWriteStubSettings.createStub(BigQueryWriteStubSettings.java:145)
       at 
com.google.cloud.bigquery.storage.v1beta2.BigQueryWriteClient.<init>(BigQueryWriteClient.java:120)
       at 
com.google.cloud.bigquery.storage.v1beta2.BigQueryWriteClient.create(BigQueryWriteClient.java:101)
       at 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.newBigQueryWriteClient(BigQueryServicesImpl.java:1255)
       at 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.access$800(BigQueryServicesImpl.java:135)
       at 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.<init>(BigQueryServicesImpl.java:521)
       at 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.<init>(BigQueryServicesImpl.java:449)
       at 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.getDatasetService(BigQueryServicesImpl.java:169)
       at 
org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite.flushRows(BatchedStreamingWrite.java:374)
       at 
org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite.access$800(BatchedStreamingWrite.java:69)
       at 
org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite$BatchAndInsertElements.finishBundle(BatchedStreamingWrite.java:271)
       at 
org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite$BatchAndInsertElements$DoFnInvoker.invokeFinishBundle(Unknown
 Source)
       at 
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.finishBundle(SimpleDoFnRunner.java:242)
       at 
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.finishBundle(SimpleParDoFn.java:432)
       at 
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:56)
       at 
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:103)
       at 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1430)
       at 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:165)
       at 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1109)
       at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
       at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
       at java.base/java.lang.Thread.run(Thread.java:834)
   "  

The code used to set up output to the BQ Sink:
        WriteResult writeResult = ourEntityPCollection
            .apply("WriteEventsToBQ", BigQueryIO.<OurEntity>write()
                .to(eventTable)
                .withFormatFunction(this::toTableRow)
                
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
                .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
                .withExtendedErrorInfo()              
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)         
       .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

        writeResult
            .getFailedInsertsWithErr()
            .apply("LogErrors", MapElements.into(TypeDescriptors.strings())
                .via(new LogErrorFn()));
 

> BigQueryWriteClient is not closed
> ---------------------------------
>
>                 Key: BEAM-12365
>                 URL: https://issues.apache.org/jira/browse/BEAM-12365
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-gcp
>            Reporter: Brian Hulette
>            Assignee: Reuven Lax
>            Priority: P2
>
> See BEAM-12359
> Since the BigQueryWriteClient added in 
> https://github.com/apache/beam/pull/14309 is not closed, any BigQueryIO user 
> gets a SEVERE error complaining that it's not shutdown:
> {code}
> 2021-05-18 23:39:57.985 CEST [main] INFO  
> o.a.b.r.d.DataflowPipelineTranslator - Adding 
> ReadBQMyClass/ReadTableRowsFromFieldsThroughStorageAPI_MyClass/Read(BigQueryStorageTableSource)
>  as step s12
> 2021-05-18 11:39:57 PM 
> io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference 
> cleanQueue
> SEVERE: *~*~*~ Channel ManagedChannelImpl{logId=3, 
> target=bigquerystorage.googleapis.com:443} was not shutdown properly!!! 
> ~*~*~*    Make sure to call shutdown()/shutdownNow() and wait until 
> awaitTermination() returns true.
> java.lang.RuntimeException: ManagedChannel allocation site at 
> io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference.<init>(ManagedChannelOrphanWrapper.java:93)
>  at 
> io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:53)
>  at 
> io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:44)
>  at 
> io.grpc.internal.ManagedChannelImplBuilder.build(ManagedChannelImplBuilder.java:612)
>  at 
> io.grpc.internal.AbstractManagedChannelImplBuilder.build(AbstractManagedChannelImplBuilder.java:261)
>  at 
> com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createSingleChannel(InstantiatingGrpcChannelProvider.java:340)
>  at 
> com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.access$1600(InstantiatingGrpcChannelProvider.java:73)
>  at 
> com.google.api.gax.grpc.InstantiatingGrpcChannelProvider$1.createSingleChannel(InstantiatingGrpcChannelProvider.java:214)
>  at com.google.api.gax.grpc.ChannelPool.create(ChannelPool.java:72) at 
> com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createChannel(InstantiatingGrpcChannelProvider.java:221)
>  at 
> com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.getTransportChannel(InstantiatingGrpcChannelProvider.java:204)
>  at com.google.api.gax.rpc.ClientContext.create(ClientContext.java:169) at 
> com.google.cloud.bigquery.storage.v1beta2.stub.GrpcBigQueryWriteStub.create(GrpcBigQueryWriteStub.java:136)
>  at 
> com.google.cloud.bigquery.storage.v1beta2.stub.BigQueryWriteStubSettings.createStub(BigQueryWriteStubSettings.java:145)
>  at 
> com.google.cloud.bigquery.storage.v1beta2.BigQueryWriteClient.<init>(BigQueryWriteClient.java:120)
>  at 
> com.google.cloud.bigquery.storage.v1beta2.BigQueryWriteClient.create(BigQueryWriteClient.java:101)
>  at 
> org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.newBigQueryWriteClient(BigQueryServicesImpl.java:1255)
>  at 
> org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.access$800(BigQueryServicesImpl.java:135)
>  at 
> org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.<init>(BigQueryServicesImpl.java:521)
>  at 
> org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.<init>(BigQueryServicesImpl.java:449)
>  at 
> org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.getDatasetService(BigQueryServicesImpl.java:169)
>  at 
> org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$TypedRead.validate(BigQueryIO.java:965)
>  at 
> org.apache.beam.sdk.Pipeline$ValidateVisitor.enterCompositeTransform(Pipeline.java:661)
>  at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:575)
>  at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:579)
>  at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:579)
>  at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500(TransformHierarchy.java:239)
>  at 
> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:213)
>  at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:468) at 
> org.apache.beam.sdk.Pipeline.validate(Pipeline.java:597) at 
> org.apache.beam.sdk.Pipeline.run(Pipeline.java:321) at 
> org.apache.beam.sdk.Pipeline.run(Pipeline.java:308)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to