[ https://issues.apache.org/jira/browse/BEAM-14394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Bruno Volpato da Cunha updated BEAM-14394: ------------------------------------------ Fix Version/s: Not applicable Resolution: Invalid Status: Resolved (was: Triage Needed) > BigQueryIO not thread safe, may fail with ConcurrentModificationException > ------------------------------------------------------------------------- > > Key: BEAM-14394 > URL: https://issues.apache.org/jira/browse/BEAM-14394 > Project: Beam > Issue Type: Bug > Components: io-java-gcp > Affects Versions: 2.38.0 > Reporter: Bruno Volpato da Cunha > Priority: P2 > Fix For: Not applicable > > Time Spent: 1h 20m > Remaining Estimate: 0h > > I am using the Java SDK with a ParDo, in which the code has some parallelism > (basically, a parallel Stream over an Iterable). > This ParDo is outputting to a side output that is redirected to BigQuery, > using Streaming Inserts, with the following parameters: > > {code:java} > return BigQueryIO.<T>write() > .to(dynamicDestinations) > .withMethod(Write.Method.STREAMING_INSERTS) > .withCreateDisposition(CreateDisposition.CREATE_NEVER) > .withWriteDisposition(WriteDisposition.WRITE_APPEND) > .withCustomGcsTempLocation(StaticValueProvider.of(gcsTempLocation)) > .withFormatFunction(formatFunction) > .withFailedInsertRetryPolicy(InsertRetryPolicy.neverRetry()) > .withExtendedErrorInfo(); {code} > > > Sometimes, the following error is observed: > > {code:java} > org.apache.beam.sdk.util.UserCodeException: > java.util.ConcurrentModificationException > at > org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39) > at > org.apache.beam.sdk.io.gcp.bigquery.CreateTables$CreateTablesFn$DoFnInvoker.invokeProcessElement(Unknown > Source) > at > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228) > at > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:187) > at > org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340) > at > org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44) > at > org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49) > at > org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285) > at > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:268) > at > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:84) > at > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:416) > at > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:404) > at > org.apache.beam.sdk.io.gcp.bigquery.PrepareWrite$1.processElement(PrepareWrite.java:85) > at > org.apache.beam.sdk.io.gcp.bigquery.PrepareWrite$1$DoFnInvoker.invokeProcessElement(Unknown > Source) > at > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228) > at > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:184) > at > org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340) > at > org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44) > at > org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49) > at > org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285) > at > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:268) > at > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:84) > at > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:416) > at > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:404) > at > org.apache.beam.sdk.transforms.FlatMapElements$2.processElement(FlatMapElements.java:156) > at > org.apache.beam.sdk.transforms.FlatMapElements$2$DoFnInvoker.invokeProcessElement(Unknown > Source) > at > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228) > at > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:187) > at > org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340) > at > org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44) > at > org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49) > at > org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285) > at > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:268) > at > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:84) > at > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:416) > at <<Some Parallel Custom Code.java>> > at > java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) > at > java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655) > at > java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) > at > java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) > at > java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913) > at > java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578) > at <<Some Parallel Custom Code.java>> > at > java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) > at > java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) > at > java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) > at > java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) > at > java.base/java.util.stream.ReduceOps$ReduceTask.doLeaf(ReduceOps.java:952) > at > java.base/java.util.stream.ReduceOps$ReduceTask.doLeaf(ReduceOps.java:926) > at > java.base/java.util.stream.AbstractTask.compute(AbstractTask.java:327) > at > java.base/java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:746) > at > java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) > at > java.base/java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:408) > at > java.base/java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:736) > at > java.base/java.util.stream.ReduceOps$ReduceOp.evaluateParallel(ReduceOps.java:919) > at > java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233) > at > java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578) > at <<Some Parallel Custom Code.java>> > at > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228) > at > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:184) > at > org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340) > at > org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44) > at > org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49) > at > org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285) > at > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:268) > at > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:84) > at > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:416) > at > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:404) > at <<Some Parallel Custom Code.java>> > at > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228) > at > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:187) > at > org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340) > at > org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44) > at > org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49) > at > org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:185) > at > org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:108) > at > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.lambda$onTrigger$1(ReduceFnRunner.java:1058) > at > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output(ReduceFnContextFactory.java:445) > at > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:130) > at > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1061) > at > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.emit(ReduceFnRunner.java:932) > at > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:373) > at > org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:96) > at > org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:43) > at > org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:121) > at > org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73) > at > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80) > at > org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:137) > at > org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44) > at > org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49) > at > org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:212) > at > org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:163) > at > org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83) > at > org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1445) > at > org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:165) > at > org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1120) > at > org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor.lambda$executeLockHeld$0(BoundedQueueExecutor.java:133) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at java.base/java.lang.Thread.run(Thread.java:834) > Caused by: java.util.ConcurrentModificationException > at java.base/java.util.HashMap.computeIfAbsent(HashMap.java:1134) > at > org.apache.beam.sdk.io.gcp.bigquery.CreateTables$CreateTablesFn.processElement(CreateTables.java:104) > " {code} > > > Unfortunately, I was trying to come up with a toy example to post here but > could not reproduce it there. > > When investigating the class CreateTables, it instantiates one > _Map<DestinationT, TableDestination> destinations = Maps.newHashMap()_ ** per > bundle, and _processElement_ can happen concurrently, following the user's > code. > The current implementation for the Map is a HashMap. I believe that either > synchronizing the _destinations.computeIfAbsent_ call or simply initializing > it with Maps.newConcurrentMap() would be enough. > > I am open to implementing this change and leaning towards just moving to a > ConcurrentHashMap, so just checking if I am not missing any detail. -- This message was sent by Atlassian Jira (v8.20.7#820007)