[
https://issues.apache.org/jira/browse/BEAM-7938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17239327#comment-17239327
]
Blaize Julien commented on BEAM-7938:
-------------------------------------
I have the same error with Dataflow 2.25.0 when I try to use a Wait.on() method.
{code:java}
Pipeline pipeline = Pipeline.create(datastoreToBigQueryOptions);
PCollection<Entity> entityCollection = pipeline.apply("Read datastore", reader);
PCollection<TableRow> mappedCollection = entityCollection
.apply("Transform to TableRow",
MapElements.into(TypeDescriptor.of(TableRow.class)).via(new
FormatTableRowFunction(domain)));
mappedCollection.apply("Write to BQ", writer);
//we reuse the same entityCollection to delete the entities
//but need to wait for the transformation
entityCollection.apply("Wait before delete", Wait.on(mappedCollection))
.apply("Delete datastore",
DatastoreIO.v1().deleteEntity().withProjectId(projectId));
pipeline.run().waitUntilFinish();
{code}
> ProtoCoder <T extends Message> throws NoSuchMethodException:
> com.google.protobuf.Message.getDefaultInstance()
> -------------------------------------------------------------------------------------------------------------
>
> Key: BEAM-7938
> URL: https://issues.apache.org/jira/browse/BEAM-7938
> Project: Beam
> Issue Type: Bug
> Components: extensions-java-protobuf, io-java-files, io-java-gcp,
> io-java-parquet
> Affects Versions: 2.14.0
> Reporter: Lien Michiels
> Priority: P3
> Labels: newbie, starter
>
> h3. Context
> I have a beam pipeline running on DataFlow using the Java SDK that pulls
> Proto wrapper messages from a PubSub subscription, I partition these by the
> OneOf-value and then apply a MapElements to extract the underlying Proto
> message, so that I end up with a PCollectionList<T extends Message>. I then
> do some more processing and try to write them to different sinks. BigQueryIO
> works absolutely fine. However when I try to use the PubsubIO or ParquetIO, I
> end up with this error when using FileIO (for Parquet):
>
> {code:java}
> java.lang.IllegalArgumentException: java.lang.NoSuchMethodException:
> com.google.protobuf.Message.getDefaultInstance()
> org.apache.beam.sdk.extensions.protobuf.ProtoCoder.getParser(ProtoCoder.java:288)
>
> org.apache.beam.sdk.extensions.protobuf.ProtoCoder.decode(ProtoCoder.java:192)
>
> org.apache.beam.sdk.extensions.protobuf.ProtoCoder.decode(ProtoCoder.java:108)
>
> org.apache.beam.runners.dataflow.worker.WindmillKeyedWorkItem.lambda$elementsIterable$2(WindmillKeyedWorkItem.java:107)
>
> org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterators$7.transform(Iterators.java:750)
>
> org.apache.beam.vendor.guava.v20_0.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
> java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
> java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
>
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
>
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
>
> java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
>
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>
> java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
>
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.LateDataDroppingDoFnRunner$LateDataFilter.filter(LateDataDroppingDoFnRunner.java:128)
>
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:76)
>
> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:134)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77)
>
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1287)
>
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149)
>
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1024)
>
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> java.base/java.lang.Thread.run(Thread.java:834)
> {code}
>
> and this for PubsubIO:
>
> {code:java}
> java.lang.IllegalArgumentException: java.lang.NoSuchMethodException:
> com.google.protobuf.Message.getDefaultInstance()
> org.apache.beam.sdk.extensions.protobuf.ProtoCoder.getParser(ProtoCoder.java:288)
>
> org.apache.beam.sdk.extensions.protobuf.ProtoCoder.decode(ProtoCoder.java:192)
>
> org.apache.beam.sdk.extensions.protobuf.ProtoCoder.decode(ProtoCoder.java:108)
>
> org.apache.beam.runners.dataflow.worker.WindmillKeyedWorkItem.lambda$elementsIterable$2(WindmillKeyedWorkItem.java:107)
>
> org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterators$7.transform(Iterators.java:750)
>
> org.apache.beam.vendor.guava.v20_0.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
> java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
> java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
>
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
>
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
>
> java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
>
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>
> java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
>
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.LateDataDroppingDoFnRunner$LateDataFilter.filter(LateDataDroppingDoFnRunner.java:128)
>
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:76)
>
> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:134)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77)
>
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1287)
>
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149)
>
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1024)
>
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> java.base/java.lang.Thread.run(Thread.java:834)
> {code}
>
> h3. Source Code for Error
> ProtoCoder.java (lines 278-292:
> https://github.com/apache/beam/blob/968a80611d424764962e79b726bfa4fd17ced104/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java#L278)
> {code:java}
> /** Get the memoized {@link Parser}, possibly initializing it lazily. */
> private Parser<T> getParser() { if (memoizedParser == null) { try {
> @SuppressWarnings("unchecked") T protoMessageInstance = (T)
> protoMessageClass.getMethod("getDefaultInstance").invoke(null);
> @SuppressWarnings("unchecked") Parser<T> tParser = (Parser<T>)
> protoMessageInstance.getParserForType(); memoizedParser = tParser; } catch
> (IllegalAccessException | InvocationTargetException | NoSuchMethodException
> e) { throw new IllegalArgumentException(e); } } return memoizedParser; }
> {code}
> h3. Potential Solution?
> I am guessing the "hack" they propose on Stackoverflow
> ([https://stackoverflow.com/questions/44134712/generic-protobuf-deserializer-in-java])
> could _potentially_ solve the problem?
> {code:java}
> ParameterizedType pType = (ParameterizedType)
> subclass.getGenericSuperclass(); Class<T> tClass = (Class<T>)
> pType.getActualTypeArguments()[0]; // In the case where the constructor for
> `T` takes no arguments. parser = tClass.newInstance().getParserForType();
> {code}
>
> Thank you for taking the time to read this. I'd be more than happy to
> contribute to a solution, but I'm not a Proto/Beam or even Java super-user so
> I would need some assistance.
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)