It's indeed the first Logical identifier with Row base type. The UUID is generated from the name of the class, but doing it in code (from a string) you need to create bytes from the string, then a UUID.
_/ _/ Alex Van Boxel On Mon, Jan 13, 2020 at 10:40 PM Brian Hulette <[email protected]> wrote: > I guess these are the first logical types we've defined with a base type > of row. It does seem reasonable that a static schema for a logical type > could have some fixed id, but it feels odd to have a fixed UUID, it would > be nice if we could give the schema some meaningful static identifier. > > I think Reuven was investigating adding support for "named" schemas in > order to add support for recursive schemas, but ran into some issues. Maybe > something like that is what we need here? > > On Mon, Jan 13, 2020 at 12:26 PM Alex Van Boxel <[email protected]> wrote: > >> Fix in this PR: >> >> [BEAM-9113] Fix serialization proto logical types >> https://github.com/apache/beam/pull/10569 >> >> or we all agree to *promote* the logical types to top-level logical >> types (as described in the design document, see ticket): >> >> [BEAM-9037] Instant and duration as logical type >> https://github.com/apache/beam/pull/10486 >> >> >> >> _/ >> _/ Alex Van Boxel >> >> >> On Mon, Jan 13, 2020 at 8:40 PM Alex Van Boxel <[email protected]> wrote: >> >>> So I think the following happens: >>> >>> 1. the schema tree is initialized at construction time. The tree get >>> serialized and send to the workers >>> 2. the workers deserialize the tree, but as the Timestamp logical >>> type have a logical type with a *static* schema the schema will be >>> *re-initialized >>> without the UUID* (as it was never serialized) >>> 3. this is why setting a fixed UUID at static initialization works >>> >>> So solution is >>> >>> - as tested, se a fixed UUID >>> - make the schema not static in the logical type >>> >>> _/ >>> _/ Alex Van Boxel >>> >>> >>> On Mon, Jan 13, 2020 at 8:08 PM Reuven Lax <[email protected]> wrote: >>> >>>> SchemaCoder today recursively sets UUIDs for all schemas, including >>>> logical types, in setSchemaIds. Is it possible that your changes modified >>>> that logic somehow? >>>> >>>> On Mon, Jan 13, 2020 at 9:39 AM Alex Van Boxel <[email protected]> >>>> wrote: >>>> >>>>> This is the stacktrace: >>>>> >>>>> >>>>> java.lang.IllegalStateException at >>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:491) >>>>> at >>>>> org.apache.beam.sdk.coders.RowCoderGenerator.getCoder(RowCoderGenerator.java:380) >>>>> at >>>>> org.apache.beam.sdk.coders.RowCoderGenerator.getCoder(RowCoderGenerator.java:371) >>>>> at >>>>> org.apache.beam.sdk.coders.RowCoderGenerator.createComponentCoders(RowCoderGenerator.java:337) >>>>> at >>>>> org.apache.beam.sdk.coders.RowCoderGenerator.generate(RowCoderGenerator.java:140) >>>>> at >>>>> org.apache.beam.sdk.schemas.SchemaCoder.getDelegateCoder(SchemaCoder.java:159) >>>>> at org.apache.beam.sdk.schemas.SchemaCoder.toString(SchemaCoder.java:204) >>>>> at java.lang.String.valueOf(String.java:2994) at >>>>> java.lang.StringBuilder.append(StringBuilder.java:131) at >>>>> org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:300) >>>>> at >>>>> org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291) >>>>> at >>>>> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:623) >>>>> at >>>>> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:539) >>>>> at >>>>> org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(IntrinsicMapTaskExecutorFactory.java:400) >>>>> at >>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:125) >>>>> at >>>>> org.apache.beam.runners.dataflow.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:64) >>>>> at >>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:43) >>>>> at >>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201) >>>>> at >>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159) >>>>> at >>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77) >>>>> at >>>>> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:411) >>>>> at >>>>> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:380) >>>>> at >>>>> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:305) >>>>> at >>>>> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:140) >>>>> at >>>>> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:120) >>>>> at >>>>> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107) >>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at >>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>>> at java.lang.Thread.run(Thread.java:748) >>>>> >>>>> In red is the case for LogicalType. Setting it to a fixed value makes >>>>> it run on Dataflow. Note that the pipeline works perfect on DirectRunner. >>>>> >>>>> _/ >>>>> _/ Alex Van Boxel >>>>> >>>>> >>>>> On Mon, Jan 13, 2020 at 6:06 PM Reuven Lax <[email protected]> wrote: >>>>> >>>>>> I don't think that should be the case. Also SchemaCoder will >>>>>> automatically set the UUID for such logical types. >>>>>> >>>>>> On Mon, Jan 13, 2020 at 8:24 AM Alex Van Boxel <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> OK, I've rechecked everything and eventually found the problem. The >>>>>>> problem is when you use a LogicalType backed back a Row, then the UUID >>>>>>> needs to be set to make it work. (this is the case for Proto based >>>>>>> Timestamps). I'll create a fix. >>>>>>> >>>>>>> _/ >>>>>>> _/ Alex Van Boxel >>>>>>> >>>>>>> >>>>>>> On Mon, Jan 13, 2020 at 8:36 AM Reuven Lax <[email protected]> wrote: >>>>>>> >>>>>>>> Can you elucidate? All BeamSQL pipelines use schemas and I believe >>>>>>>> those test are working just fine on the Dataflow runner. In addition, >>>>>>>> there >>>>>>>> are a number of ValidatesRunner schema-aware pipelines that are running >>>>>>>> regularly on the Dataflow runner. >>>>>>>> >>>>>>>> On Sun, Jan 12, 2020 at 1:43 AM Alex Van Boxel <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hey all, >>>>>>>>> >>>>>>>>> anyone tried master with a *schema aware pipeline* on Dataflow? >>>>>>>>> I'm testing some PR's to see if the run on Dataflow (as they are >>>>>>>>> working on >>>>>>>>> Direct) but they got: >>>>>>>>> >>>>>>>>> >>>>>>>>> Workflow failed. Causes: The Dataflow job appears to be stuck >>>>>>>>> because no worker activity has been seen in the last 1h. You can get >>>>>>>>> help >>>>>>>>> with Cloud Dataflow at >>>>>>>>> >>>>>>>>> because I got this I wanted to see if master (without my changes) >>>>>>>>> also have the same behaviour. >>>>>>>>> >>>>>>>>> It's easy to simulate: Just read for BigQuery with: >>>>>>>>> >>>>>>>>> BigQueryIO.readTableRowsWithSchema() >>>>>>>>> >>>>>>>>> it works with the classic: readTableRows(). >>>>>>>>> >>>>>>>>> _/ >>>>>>>>> _/ Alex Van Boxel >>>>>>>>> >>>>>>>>
