Abacn commented on issue #21247:
URL: https://github.com/apache/beam/issues/21247#issuecomment-2148748421

   This issue claims a single problem row can cause the pipeline to fail, while 
there is an opposite issue #25233 noting failed rows get silently dropped.
   
   I find both are true. In fact, at least since Beam 2.50.0, to current 
(2.57.0-SNAPSHOT), the pipeline can non-deterministicly fail or pass on invalid 
element that would cause write row fail.
   
   In some trials, failed rows are passed to a "flattenErrors" which gives 
PCollection<BigQueryStorageApiInsertError>
   
   In other trails, a bundle failed with error like
   
   ```
   org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: 
Append to stream 
projects/google.com:clouddfe/datasets/yathu_test/tables/repro_344735929/streams/Cic2ODBiNzllZi0wMDAwLTI4YmUtYWQ3Yy0xNDIyM2JiMDljMmU6czA
 failed with Status Code INVALID_ARGUMENT. The stream may not exist.
        at 
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
        at 
org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords$WriteRecordsDoFn$DoFnInvoker.invokeFinishBundle(Unknown
 Source)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner.finishBundle(FnApiDoFnRunner.java:1776)
        at 
org.apache.beam.fn.harness.data.PTransformFunctionRegistry.lambda$register$0(PTransformFunctionRegistry.java:116)
        at 
org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:560)
        at 
org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:150)
        at 
org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:115)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:163)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
   Caused by: java.lang.RuntimeException: Append to stream 
projects/google.com:clouddfe/datasets/yathu_test/tables/repro_344735929/streams/Cic2ODBiNzllZi0wMDAwLTI4YmUtYWQ3Yy0xNDIyM2JiMDljMmU6czA
 failed with Status Code INVALID_ARGUMENT. The stream may not exist.
        at 
org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords$WriteRecordsDoFn$DestinationState.lambda$flush$8(StorageApiWriteUnshardedRecords.java:778)
        at 
org.apache.beam.sdk.io.gcp.bigquery.RetryManager.await(RetryManager.java:311)
        at 
org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords$WriteRecordsDoFn.flushAll(StorageApiWriteUnshardedRecords.java:965)
        at 
org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords$WriteRecordsDoFn.finishBundle(StorageApiWriteUnshardedRecords.java:1113)
   Caused by: com.google.api.gax.rpc.InvalidArgumentException: 
io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Cannot convert value to DATE, 
value: 6585321 on field date. Entity: 
projects/google.com:clouddfe/datasets/yathu_test/tables/repro_344735929/streams/Cic2ODBiNzllZi0wMDAwLTI4YmUtYWQ3Yy0xNDIyM2JiMDljMmU6czA
        at 
com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:92)
        at 
com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:41)
        at 
com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:86)
        at 
com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:66)
        at 
com.google.api.gax.grpc.ExceptionResponseObserver.onErrorImpl(ExceptionResponseObserver.java:82)
        at 
com.google.api.gax.rpc.StateCheckingResponseObserver.onError(StateCheckingResponseObserver.java:84)
        at 
com.google.api.gax.grpc.GrpcDirectStreamController$ResponseObserverAdapter.onClose(GrpcDirectStreamController.java:148)
        at 
io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
        at 
io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
        at 
io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
        at 
com.google.api.gax.grpc.ChannelPool$ReleasingClientCall$1.onClose(ChannelPool.java:570)
        at 
io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
        at 
io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
        at 
io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
        at 
io.grpc.census.CensusStatsModule$StatsClientInterceptor$1$1.onClose(CensusStatsModule.java:814)
        at 
io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
        at 
io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
        at 
io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
        at 
io.grpc.census.CensusTracingModule$TracingClientInterceptor$1$1.onClose(CensusTracingModule.java:494)
        at 
io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:574)
        at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:72)
        at 
io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:742)
        at 
io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:723)
        at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at 
io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
   Caused by: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Cannot convert 
value to DATE, value: 6585321 on field date. Entity: 
projects/google.com:clouddfe/datasets/yathu_test/tables/repro_344735929/streams/Cic2ODBiNzllZi0wMDAwLTI4YmUtYWQ3Yy0xNDIyM2JiMDljMmU6czA
        at io.grpc.Status.asRuntimeException(Status.java:533)
        ... 22 more
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to