[ 
https://issues.apache.org/jira/browse/BEAM-14394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17531367#comment-17531367
 ] 

Bruno Volpato da Cunha commented on BEAM-14394:
-----------------------------------------------

That's what I have been doing. However, I thought of it as a workaround and 
raised the issue to figure out if there was any bug that could be fixed.

In this case, I am going to resolve this issue.

 

Leaving a few stack traces that were obtained during the concurrent tests, as 
they may help others to find this discussion.
{code:java}
Exception in thread "main" java.lang.NullPointerException
    at 
org.apache.beam.runners.direct.ImmutabilityEnforcementFactory$ImmutabilityCheckingEnforcement.beforeElement(ImmutabilityEnforcementFactory.java:123)
    at 
org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:162)
    at 
org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:129)
    at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
 {code}
{code:java}
Exception in thread "main" 
org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
java.lang.ArrayIndexOutOfBoundsException
    at 
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:373)
    at 
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:341)
    at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
    at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:323)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:309)
    at Concurrency.main(Concurrency.java:48)
Caused by: java.lang.ArrayIndexOutOfBoundsException
    at 
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
 Method)
    at 
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at 
java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
    at 
java.base/java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:603)
    at 
java.base/java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:678)
    at java.base/java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:737)
    at 
java.base/java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159)
    at 
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:173)
    at 
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
    at 
java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
    at 
java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:661)
    at Concurrency$1.processElement(Concurrency.java:29)
Caused by: java.lang.ArrayIndexOutOfBoundsException: Index 7 out of bounds for 
length 7
    at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList$Builder.add(ImmutableList.java:790)
    at 
org.apache.beam.runners.direct.ImmutableListBundleFactory$UncommittedImmutableListBundle.add(ImmutableListBundleFactory.java:103)
    at 
org.apache.beam.runners.direct.ImmutableListBundleFactory$UncommittedImmutableListBundle.add(ImmutableListBundleFactory.java:65)
    at 
org.apache.beam.runners.direct.CloningBundleFactory$CloningBundle.add(CloningBundleFactory.java:88)
    at 
org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:119)
    at 
org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:305)
    at 
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:268)
    at 
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:84)
    at 
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:416)
    at 
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:404)
    at Concurrency$1.lambda$processElement$0(Concurrency.java:30)
    at 
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
    at 
java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
    at 
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
    at 
java.base/java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290)
    at 
java.base/java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:746)
    at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
    at 
java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
    at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
    at 
java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
    at 
java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
 {code}

> BigQueryIO not thread safe, may fail with ConcurrentModificationException
> -------------------------------------------------------------------------
>
>                 Key: BEAM-14394
>                 URL: https://issues.apache.org/jira/browse/BEAM-14394
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-gcp
>    Affects Versions: 2.38.0
>            Reporter: Bruno Volpato da Cunha
>            Priority: P2
>          Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> I am using the Java SDK with a ParDo, in which the code has some parallelism 
> (basically, a parallel Stream over an Iterable).
> This ParDo is outputting to a side output that is redirected to BigQuery, 
> using Streaming Inserts, with the following parameters:
>  
> {code:java}
> return BigQueryIO.<T>write()
>     .to(dynamicDestinations)
>     .withMethod(Write.Method.STREAMING_INSERTS)
>     .withCreateDisposition(CreateDisposition.CREATE_NEVER)
>     .withWriteDisposition(WriteDisposition.WRITE_APPEND)
>     .withCustomGcsTempLocation(StaticValueProvider.of(gcsTempLocation))
>     .withFormatFunction(formatFunction)
>     .withFailedInsertRetryPolicy(InsertRetryPolicy.neverRetry())
>     .withExtendedErrorInfo(); {code}
>  
>  
> Sometimes, the following error is observed:
>  
> {code:java}
> org.apache.beam.sdk.util.UserCodeException: 
> java.util.ConcurrentModificationException
>       at 
> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
>       at 
> org.apache.beam.sdk.io.gcp.bigquery.CreateTables$CreateTablesFn$DoFnInvoker.invokeProcessElement(Unknown
>  Source)
>       at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228)
>       at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:187)
>       at 
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340)
>       at 
> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
>       at 
> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
>       at 
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285)
>       at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:268)
>       at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:84)
>       at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:416)
>       at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:404)
>       at 
> org.apache.beam.sdk.io.gcp.bigquery.PrepareWrite$1.processElement(PrepareWrite.java:85)
>       at 
> org.apache.beam.sdk.io.gcp.bigquery.PrepareWrite$1$DoFnInvoker.invokeProcessElement(Unknown
>  Source)
>       at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228)
>       at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:184)
>       at 
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340)
>       at 
> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
>       at 
> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
>       at 
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285)
>       at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:268)
>       at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:84)
>       at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:416)
>       at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:404)
>       at 
> org.apache.beam.sdk.transforms.FlatMapElements$2.processElement(FlatMapElements.java:156)
>       at 
> org.apache.beam.sdk.transforms.FlatMapElements$2$DoFnInvoker.invokeProcessElement(Unknown
>  Source)
>       at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228)
>       at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:187)
>       at 
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340)
>       at 
> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
>       at 
> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
>       at 
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285)
>       at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:268)
>       at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:84)
>       at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:416)
>       at <<Some Parallel Custom Code.java>>
>       at 
> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
>       at 
> java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
>       at 
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
>       at 
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
>       at 
> java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
>       at 
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>       at 
> java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
>       at <<Some Parallel Custom Code.java>>
>         at 
> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
>       at 
> java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
>       at 
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
>       at 
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
>       at 
> java.base/java.util.stream.ReduceOps$ReduceTask.doLeaf(ReduceOps.java:952)
>       at 
> java.base/java.util.stream.ReduceOps$ReduceTask.doLeaf(ReduceOps.java:926)
>       at 
> java.base/java.util.stream.AbstractTask.compute(AbstractTask.java:327)
>       at 
> java.base/java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:746)
>       at 
> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
>       at 
> java.base/java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:408)
>       at 
> java.base/java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:736)
>       at 
> java.base/java.util.stream.ReduceOps$ReduceOp.evaluateParallel(ReduceOps.java:919)
>       at 
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
>       at 
> java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
>       at <<Some Parallel Custom Code.java>>
>       at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228)
>       at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:184)
>       at 
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340)
>       at 
> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
>       at 
> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
>       at 
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285)
>       at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:268)
>       at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:84)
>       at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:416)
>       at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:404)
>         at <<Some Parallel Custom Code.java>> 
>         at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228)
>       at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:187)
>       at 
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340)
>       at 
> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
>       at 
> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
>       at 
> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:185)
>       at 
> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:108)
>       at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.lambda$onTrigger$1(ReduceFnRunner.java:1058)
>       at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output(ReduceFnContextFactory.java:445)
>       at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:130)
>       at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1061)
>       at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.emit(ReduceFnRunner.java:932)
>       at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:373)
>       at 
> org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:96)
>       at 
> org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:43)
>       at 
> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:121)
>       at 
> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
>       at 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
>       at 
> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:137)
>       at 
> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
>       at 
> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
>       at 
> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:212)
>       at 
> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:163)
>       at 
> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83)
>       at 
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1445)
>       at 
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:165)
>       at 
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1120)
>       at 
> org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor.lambda$executeLockHeld$0(BoundedQueueExecutor.java:133)
>       at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>       at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>       at java.base/java.lang.Thread.run(Thread.java:834)
> Caused by: java.util.ConcurrentModificationException
>       at java.base/java.util.HashMap.computeIfAbsent(HashMap.java:1134)
>       at 
> org.apache.beam.sdk.io.gcp.bigquery.CreateTables$CreateTablesFn.processElement(CreateTables.java:104)
> " {code}
>  
>  
> Unfortunately, I was trying to come up with a toy example to post here but 
> could not reproduce it there.
>  
> When investigating the class CreateTables, it instantiates one 
> _Map<DestinationT, TableDestination> destinations = Maps.newHashMap()_ ** per 
> bundle, and _processElement_ can happen concurrently, following the user's 
> code.
> The current implementation for the Map is a HashMap. I believe that either 
> synchronizing the _destinations.computeIfAbsent_ call or simply initializing 
> it with Maps.newConcurrentMap() would be enough.
>  
> I am open to implementing this change and leaning towards just moving to a 
> ConcurrentHashMap, so just checking if I am not missing any detail.
> (Edit: see [https://github.com/apache/beam/pull/17532] )



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to