Fix in this PR: [BEAM-9113] Fix serialization proto logical types https://github.com/apache/beam/pull/10569
or we all agree to *promote* the logical types to top-level logical types (as described in the design document, see ticket): [BEAM-9037] Instant and duration as logical type https://github.com/apache/beam/pull/10486 _/ _/ Alex Van Boxel On Mon, Jan 13, 2020 at 8:40 PM Alex Van Boxel <[email protected]> wrote: > So I think the following happens: > > 1. the schema tree is initialized at construction time. The tree get > serialized and send to the workers > 2. the workers deserialize the tree, but as the Timestamp logical type > have a logical type with a *static* schema the schema will be > *re-initialized > without the UUID* (as it was never serialized) > 3. this is why setting a fixed UUID at static initialization works > > So solution is > > - as tested, se a fixed UUID > - make the schema not static in the logical type > > _/ > _/ Alex Van Boxel > > > On Mon, Jan 13, 2020 at 8:08 PM Reuven Lax <[email protected]> wrote: > >> SchemaCoder today recursively sets UUIDs for all schemas, including >> logical types, in setSchemaIds. Is it possible that your changes modified >> that logic somehow? >> >> On Mon, Jan 13, 2020 at 9:39 AM Alex Van Boxel <[email protected]> wrote: >> >>> This is the stacktrace: >>> >>> >>> java.lang.IllegalStateException at >>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:491) >>> at >>> org.apache.beam.sdk.coders.RowCoderGenerator.getCoder(RowCoderGenerator.java:380) >>> at >>> org.apache.beam.sdk.coders.RowCoderGenerator.getCoder(RowCoderGenerator.java:371) >>> at >>> org.apache.beam.sdk.coders.RowCoderGenerator.createComponentCoders(RowCoderGenerator.java:337) >>> at >>> org.apache.beam.sdk.coders.RowCoderGenerator.generate(RowCoderGenerator.java:140) >>> at >>> org.apache.beam.sdk.schemas.SchemaCoder.getDelegateCoder(SchemaCoder.java:159) >>> at org.apache.beam.sdk.schemas.SchemaCoder.toString(SchemaCoder.java:204) >>> at java.lang.String.valueOf(String.java:2994) at >>> java.lang.StringBuilder.append(StringBuilder.java:131) at >>> org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:300) >>> at >>> org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291) >>> at >>> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:623) >>> at >>> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:539) >>> at >>> org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(IntrinsicMapTaskExecutorFactory.java:400) >>> at >>> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:125) >>> at >>> org.apache.beam.runners.dataflow.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:64) >>> at >>> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:43) >>> at >>> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201) >>> at >>> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159) >>> at >>> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77) >>> at >>> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:411) >>> at >>> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:380) >>> at >>> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:305) >>> at >>> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:140) >>> at >>> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:120) >>> at >>> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107) >>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>> at java.lang.Thread.run(Thread.java:748) >>> >>> In red is the case for LogicalType. Setting it to a fixed value makes it >>> run on Dataflow. Note that the pipeline works perfect on DirectRunner. >>> >>> _/ >>> _/ Alex Van Boxel >>> >>> >>> On Mon, Jan 13, 2020 at 6:06 PM Reuven Lax <[email protected]> wrote: >>> >>>> I don't think that should be the case. Also SchemaCoder will >>>> automatically set the UUID for such logical types. >>>> >>>> On Mon, Jan 13, 2020 at 8:24 AM Alex Van Boxel <[email protected]> >>>> wrote: >>>> >>>>> OK, I've rechecked everything and eventually found the problem. The >>>>> problem is when you use a LogicalType backed back a Row, then the UUID >>>>> needs to be set to make it work. (this is the case for Proto based >>>>> Timestamps). I'll create a fix. >>>>> >>>>> _/ >>>>> _/ Alex Van Boxel >>>>> >>>>> >>>>> On Mon, Jan 13, 2020 at 8:36 AM Reuven Lax <[email protected]> wrote: >>>>> >>>>>> Can you elucidate? All BeamSQL pipelines use schemas and I believe >>>>>> those test are working just fine on the Dataflow runner. In addition, >>>>>> there >>>>>> are a number of ValidatesRunner schema-aware pipelines that are running >>>>>> regularly on the Dataflow runner. >>>>>> >>>>>> On Sun, Jan 12, 2020 at 1:43 AM Alex Van Boxel <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Hey all, >>>>>>> >>>>>>> anyone tried master with a *schema aware pipeline* on Dataflow? I'm >>>>>>> testing some PR's to see if the run on Dataflow (as they are working on >>>>>>> Direct) but they got: >>>>>>> >>>>>>> >>>>>>> Workflow failed. Causes: The Dataflow job appears to be stuck >>>>>>> because no worker activity has been seen in the last 1h. You can get >>>>>>> help >>>>>>> with Cloud Dataflow at >>>>>>> >>>>>>> because I got this I wanted to see if master (without my changes) >>>>>>> also have the same behaviour. >>>>>>> >>>>>>> It's easy to simulate: Just read for BigQuery with: >>>>>>> >>>>>>> BigQueryIO.readTableRowsWithSchema() >>>>>>> >>>>>>> it works with the classic: readTableRows(). >>>>>>> >>>>>>> _/ >>>>>>> _/ Alex Van Boxel >>>>>>> >>>>>>
