ggprod opened a new issue, #22779:
URL: https://github.com/apache/beam/issues/22779

   ### What happened?
   
   Using a new [Dataflow 
template](https://github.com/GoogleCloudPlatform/DataflowTemplates/pull/429) 
that reads Spanner change streams via the SpannerIO.readChangeStream() it 
continuously throws large numbers of errors like below and is not forwarding 
all the change stream traffic
   ```
   Operation ongoing in step 
SpannerIO-ReadChangeStream-Read-change-stream-partition-ParMultiDo-ReadChangeStreamPartition-/ProcessElementAndRestrictionWithSizing-ptransform-60
 for at least 332h25m00s without outputting or completing in state process
     at sun.misc.Unsafe.park(Native Method)
     at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:338)
     at 
com.lmax.disruptor.MultiProducerSequencer.next(MultiProducerSequencer.java:136)
     at 
com.lmax.disruptor.MultiProducerSequencer.next(MultiProducerSequencer.java:105)
     at com.lmax.disruptor.RingBuffer.next(RingBuffer.java:263)
     at 
io.opencensus.impl.internal.DisruptorEventQueue$1.enqueue(DisruptorEventQueue.java:134)
     at 
io.opencensus.impl.internal.DisruptorEventQueue.enqueue(DisruptorEventQueue.java:162)
     at io.opencensus.implcore.stats.StatsManager.record(StatsManager.java:70)
     at 
io.opencensus.implcore.stats.MeasureMapImpl.record(MeasureMapImpl.java:82)
     at 
io.grpc.census.CensusStatsModule$CallAttemptsTracerFactory.<init>(CensusStatsModule.java:447)
     at 
io.grpc.census.CensusStatsModule$StatsClientInterceptor.interceptCall(CensusStatsModule.java:791)
     at 
io.grpc.ClientInterceptors$InterceptorChannel.newCall(ClientInterceptors.java:156)
     at 
com.google.api.gax.grpc.GrpcChannelUUIDInterceptor.interceptCall(GrpcChannelUUIDInterceptor.java:52)
     at 
io.grpc.ClientInterceptors$InterceptorChannel.newCall(ClientInterceptors.java:156)
     at 
com.google.api.gax.grpc.GrpcHeaderInterceptor.interceptCall(GrpcHeaderInterceptor.java:80)
     at 
io.grpc.ClientInterceptors$InterceptorChannel.newCall(ClientInterceptors.java:156)
     at 
com.google.api.gax.grpc.GrpcMetadataHandlerInterceptor.interceptCall(GrpcMetadataHandlerInterceptor.java:54)
     at 
io.grpc.ClientInterceptors$InterceptorChannel.newCall(ClientInterceptors.java:156)
     at 
com.google.cloud.spanner.spi.v1.SpannerErrorInterceptor.interceptCall(SpannerErrorInterceptor.java:64)
     at 
io.grpc.ClientInterceptors$InterceptorChannel.newCall(ClientInterceptors.java:156)
     at 
com.google.cloud.spanner.spi.v1.LoggingInterceptor.interceptCall(LoggingInterceptor.java:68)
     at 
io.grpc.ClientInterceptors$InterceptorChannel.newCall(ClientInterceptors.java:156)
     at 
com.google.cloud.spanner.spi.v1.HeaderInterceptor.interceptCall(HeaderInterceptor.java:72)
     at 
io.grpc.ClientInterceptors$InterceptorChannel.newCall(ClientInterceptors.java:156)
     at io.grpc.internal.ManagedChannelImpl.newCall(ManagedChannelImpl.java:923)
     at 
io.grpc.internal.ForwardingManagedChannel.newCall(ForwardingManagedChannel.java:63)
     at 
io.grpc.stub.MetadataUtils$HeaderAttachingClientInterceptor.interceptCall(MetadataUtils.java:81)
     at 
io.grpc.ClientInterceptors$InterceptorChannel.newCall(ClientInterceptors.java:156)
     at com.google.api.gax.grpc.GrpcClientCalls.newCall(GrpcClientCalls.java:99)
     at 
com.google.api.gax.grpc.GrpcDirectServerStreamingCallable.call(GrpcDirectServerStreamingCallable.java:65)
     at 
com.google.api.gax.grpc.GrpcServerStreamingRequestParamCallable.call(GrpcServerStreamingRequestParamCallable.java:61)
     at 
com.google.api.gax.grpc.GrpcExceptionServerStreamingCallable.call(GrpcExceptionServerStreamingCallable.java:59)
     at 
com.google.api.gax.rpc.WatchdogServerStreamingCallable.call(WatchdogServerStreamingCallable.java:69)
     at 
com.google.api.gax.rpc.ServerStreamingCallable$1.call(ServerStreamingCallable.java:237)
     at 
com.google.api.gax.rpc.ServerStreamingAttemptCallable.call(ServerStreamingAttemptCallable.java:234)
     at 
com.google.api.gax.rpc.ServerStreamingAttemptCallable.start(ServerStreamingAttemptCallable.java:194)
     at 
com.google.api.gax.rpc.RetryingServerStreamingCallable.call(RetryingServerStreamingCallable.java:87)
     at 
com.google.api.gax.tracing.TracedServerStreamingCallable.call(TracedServerStreamingCallable.java:76)
     at 
com.google.api.gax.rpc.ServerStreamingCallable$1.call(ServerStreamingCallable.java:237)
     at 
com.google.cloud.spanner.spi.v1.GapicSpannerRpc.executeQuery(GapicSpannerRpc.java:1506)
     at 
com.google.cloud.spanner.AbstractReadContext$1.startStream(AbstractReadContext.java:667)
     at 
com.google.cloud.spanner.AbstractResultSet$ResumableStreamIterator.computeNext(AbstractResultSet.java:1105)
     at 
com.google.cloud.spanner.AbstractResultSet$ResumableStreamIterator.computeNext(AbstractResultSet.java:986)
     at 
com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:146)
     at 
com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:141)
     at 
com.google.cloud.spanner.AbstractResultSet$GrpcValueIterator.ensureReady(AbstractResultSet.java:269)
     at 
com.google.cloud.spanner.AbstractResultSet$GrpcValueIterator.getMetadata(AbstractResultSet.java:245)
     at 
com.google.cloud.spanner.AbstractResultSet$GrpcResultSet.next(AbstractResultSet.java:119)
     at 
com.google.cloud.spanner.ForwardingResultSet.next(ForwardingResultSet.java:54)
     at 
com.google.cloud.spanner.SessionPool$AutoClosingReadContext$1.internalNext(SessionPool.java:272)
     at 
com.google.cloud.spanner.SessionPool$AutoClosingReadContext$1.next(SessionPool.java:252)
     at 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao.getPartition(PartitionMetadataDao.java:110)
     at 
org.apache.beam.sdk.io.gcp.spanner.changestreams.action.QueryChangeStreamAction.run(QueryChangeStreamAction.java:166)
     at 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.ReadChangeStreamPartitionDoFn.processElement(ReadChangeStreamPartitionDoFn.java:234)
     at 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.ReadChangeStreamPartitionDoFn$DoFnInvoker.invokeProcessElement(Unknown
 Source)
     at 
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForWindowObservingSizedElementAndRestriction(FnApiDoFnRunner.java:1063)
     at 
org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:142)
     at 
org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:661)
     at 
org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:656)
     at 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)
     at 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)
     at 
org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:179)
     at 
org.apache.beam.fn.harness.BeamFnDataReadRunner$Factory$$Lambda$242/882597230.accept(Unknown
 Source)
     at 
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.multiplexElements(BeamFnDataInboundObserver2.java:158)
     at 
org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:515)
     at 
org.apache.beam.fn.harness.FnHarness$$Lambda$108/1459016715.apply(Unknown 
Source)
     at 
org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)
     at 
org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
     at 
org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver$$Lambda$117/539047905.run(Unknown
 Source)
     at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
     at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
     at java.lang.Thread.run(Thread.java:750)
   ```
   
   ### Issue Priority
   
   Priority: 1
   
   ### Issue Component
   
   Component: io-java-gcp


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to