I guess these are the first logical types we've defined with a base type of
row. It does seem reasonable that a static schema for a logical type could
have some fixed id, but it feels odd to have a fixed UUID, it would be nice
if we could give the schema some meaningful static identifier.

I think Reuven was investigating adding support for "named" schemas in
order to add support for recursive schemas, but ran into some issues. Maybe
something like that is what we need here?

On Mon, Jan 13, 2020 at 12:26 PM Alex Van Boxel <[email protected]> wrote:

> Fix in this PR:
>
> [BEAM-9113] Fix serialization proto logical types
> https://github.com/apache/beam/pull/10569
>
> or we all agree to *promote* the logical types to top-level logical types
> (as described in the design document, see ticket):
>
> [BEAM-9037] Instant and duration as logical type
> https://github.com/apache/beam/pull/10486
>
>
>
>  _/
> _/ Alex Van Boxel
>
>
> On Mon, Jan 13, 2020 at 8:40 PM Alex Van Boxel <[email protected]> wrote:
>
>> So I think the following happens:
>>
>>    1. the schema tree is initialized at construction time. The tree get
>>    serialized and send to the workers
>>    2. the workers deserialize the tree, but as the Timestamp logical
>>    type have a logical type with a *static* schema the schema will be 
>> *re-initialized
>>    without the UUID* (as it was never serialized)
>>    3. this is why setting a fixed UUID at static initialization works
>>
>> So solution is
>>
>>    - as tested, se a fixed UUID
>>    - make the schema not static in the logical type
>>
>>  _/
>> _/ Alex Van Boxel
>>
>>
>> On Mon, Jan 13, 2020 at 8:08 PM Reuven Lax <[email protected]> wrote:
>>
>>> SchemaCoder today recursively sets UUIDs for all schemas, including
>>> logical types, in setSchemaIds. Is it possible that your changes modified
>>> that logic somehow?
>>>
>>> On Mon, Jan 13, 2020 at 9:39 AM Alex Van Boxel <[email protected]> wrote:
>>>
>>>> This is the stacktrace:
>>>>
>>>>
>>>> java.lang.IllegalStateException at
>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:491)
>>>> at
>>>> org.apache.beam.sdk.coders.RowCoderGenerator.getCoder(RowCoderGenerator.java:380)
>>>> at
>>>> org.apache.beam.sdk.coders.RowCoderGenerator.getCoder(RowCoderGenerator.java:371)
>>>> at
>>>> org.apache.beam.sdk.coders.RowCoderGenerator.createComponentCoders(RowCoderGenerator.java:337)
>>>> at
>>>> org.apache.beam.sdk.coders.RowCoderGenerator.generate(RowCoderGenerator.java:140)
>>>> at
>>>> org.apache.beam.sdk.schemas.SchemaCoder.getDelegateCoder(SchemaCoder.java:159)
>>>> at org.apache.beam.sdk.schemas.SchemaCoder.toString(SchemaCoder.java:204)
>>>> at java.lang.String.valueOf(String.java:2994) at
>>>> java.lang.StringBuilder.append(StringBuilder.java:131) at
>>>> org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:300)
>>>> at
>>>> org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291)
>>>> at
>>>> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:623)
>>>> at
>>>> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:539)
>>>> at
>>>> org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(IntrinsicMapTaskExecutorFactory.java:400)
>>>> at
>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:125)
>>>> at
>>>> org.apache.beam.runners.dataflow.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:64)
>>>> at
>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:43)
>>>> at
>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
>>>> at
>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
>>>> at
>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77)
>>>> at
>>>> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:411)
>>>> at
>>>> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:380)
>>>> at
>>>> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:305)
>>>> at
>>>> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:140)
>>>> at
>>>> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:120)
>>>> at
>>>> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107)
>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>> at java.lang.Thread.run(Thread.java:748)
>>>>
>>>> In red is the case for LogicalType. Setting it to a fixed value makes
>>>> it run on Dataflow. Note that the pipeline works perfect on DirectRunner.
>>>>
>>>>  _/
>>>> _/ Alex Van Boxel
>>>>
>>>>
>>>> On Mon, Jan 13, 2020 at 6:06 PM Reuven Lax <[email protected]> wrote:
>>>>
>>>>> I don't think that should be the case. Also SchemaCoder will
>>>>> automatically set the UUID for such logical types.
>>>>>
>>>>> On Mon, Jan 13, 2020 at 8:24 AM Alex Van Boxel <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> OK, I've rechecked everything and eventually found the problem. The
>>>>>> problem is when you use a LogicalType backed back a Row, then the UUID
>>>>>> needs to be set to make it work. (this is the case for Proto based
>>>>>> Timestamps). I'll create a fix.
>>>>>>
>>>>>>  _/
>>>>>> _/ Alex Van Boxel
>>>>>>
>>>>>>
>>>>>> On Mon, Jan 13, 2020 at 8:36 AM Reuven Lax <[email protected]> wrote:
>>>>>>
>>>>>>> Can you elucidate? All BeamSQL pipelines use schemas and I believe
>>>>>>> those test are working just fine on the Dataflow runner. In addition, 
>>>>>>> there
>>>>>>> are a number of ValidatesRunner schema-aware pipelines that are running
>>>>>>> regularly on the Dataflow runner.
>>>>>>>
>>>>>>> On Sun, Jan 12, 2020 at 1:43 AM Alex Van Boxel <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hey all,
>>>>>>>>
>>>>>>>> anyone tried master with a *schema aware pipeline* on Dataflow?
>>>>>>>> I'm testing some PR's to see if the run on Dataflow (as they are 
>>>>>>>> working on
>>>>>>>> Direct) but they got:
>>>>>>>>
>>>>>>>>
>>>>>>>> Workflow failed. Causes: The Dataflow job appears to be stuck
>>>>>>>> because no worker activity has been seen in the last 1h. You can get 
>>>>>>>> help
>>>>>>>> with Cloud Dataflow at
>>>>>>>>
>>>>>>>> because I got this I wanted to see if master (without my changes)
>>>>>>>> also have the same behaviour.
>>>>>>>>
>>>>>>>> It's easy to simulate: Just read for BigQuery with:
>>>>>>>>
>>>>>>>> BigQueryIO.readTableRowsWithSchema()
>>>>>>>>
>>>>>>>> it works with the classic: readTableRows().
>>>>>>>>
>>>>>>>>  _/
>>>>>>>> _/ Alex Van Boxel
>>>>>>>>
>>>>>>>

Reply via email to