Igor,

I think your diagnosis is spot on.

Regarding the workaround, I guess there are two ways
- pipeline.auto-generate-uids=true, which is probably not what you are
looking for
- avoid FlinkSink builder and write your own glue code

As for the fix, we can probably add a `uid` method to the FlinkSink
builder. FlinkSink always sets the uid for three operators as "uid-writer",
"uid-committer", "uid-sink". if "uid '' is not provided, it is default to
the table name.

Thanks,
Steven





On Tue, Jun 8, 2021 at 10:02 AM Igor Basov <mrbaz...@gmail.com> wrote:

> Hello,
> I'm using Flink 1.11 with Iceberg 0.11.
> I use `pipeline.auto-generate-uids: false` in my Flink configuration to
> enforce assigning UIDs to operators, so that the job could be safely
> stopped and the state restored from the latest checkpoint.
> But when I use Iceberg FlinkSink I get error:
>
> Caused by: org.apache.flink.client.program.ProgramInvocationException: The 
> main method caused an error: Auto generated UIDs have been disabled but no 
> UID or hash has been assigned to operator IcebergStreamWriter
>
> I believe the problem is in this piece of code inside FlinkSink.java where
> both transforms don't have UIDs assigned.
>
>       DataStream<Void> returnStream = rowDataInput
>           .transform(ICEBERG_STREAM_WRITER_NAME, 
> TypeInformation.of(WriteResult.class), streamWriter)
>           .setParallelism(writeParallelism)
>           .transform(ICEBERG_FILES_COMMITTER_NAME, Types.VOID, filesCommitter)
>           .setParallelism(1)
>           .setMaxParallelism(1);
>
> If it's the case, is there a workaround for this issue?
>

Reply via email to