Hi Stephan, Thanks very much for your detailed reply.

*## StreamingFileSink not support writer with path*

The FLIP is "Filesystem connector in Table", it's about building up Flink
Table's capabilities. But I think Hive is important, I see that most users
use Flink and Spark to write data from Kafka to hive. Streaming writing, I
see that these two engines are convenient and popular. I mean, Flink is not
only a hive runtime, but also an important part of offline data warehouse.
The thing is StreamingFileSink not support hadoop record writers. Yes, we
can support them one by one. I see the community integrating ORC [1]. But
it's really not an easy thing. And we have to be careful to maintain
compatibility. After all, users downstream use other computing engines to
analyze.
Yes, exposing "RecoverableFsDataOutputStream" to writers is good to
subsequent optimization [2]. But there are many cases. It is enough for
users to generate new files at the checkpoint. They pay more attention to
whether they can do it and whether there is a risk of compatibility.
Therefore, RecordWriter is used here.

*## External HDFS access*

Including hadoop configuration and Kerberos related things.

*## Partition commit*

Committing a partition is to notify the downstream application that the
partition has finished writing, the partition is ready to be read.The
common way is to add a success file or update metastore. Of course, there
are other ways to notify. We need to provide flexible mechanisms.
As you mentioned, yes, we can extend "StreamingFileSink" for this part.

*## Batch / Streaming Unification*

Yes, it is about exactly-once and single commit at the end, There are also
some "bounded" differences. For example, batch can support sorting. In this
way, you can sort by partition, which can reduce the number of writers
written at the same time. Dynamic partition writing in batch may produce
many unordered partitions.

[1] https://issues.apache.org/jira/browse/FLINK-10114
[2] https://issues.apache.org/jira/browse/FLINK-11499

Best,
Jingsong Lee

On Tue, Mar 17, 2020 at 8:00 PM LakeShen <shenleifight...@gmail.com> wrote:

> Hi Jingsong ,
>
> I am looking forward this feature. Because in some streaming application,it
> need transfer their messages to hdfs , in order to offline analysis.
>
> Best wishes,
> LakeShen
>
> Stephan Ewen <se...@apache.org> 于2020年3月17日周二 下午7:42写道:
>
> > I would really like to see us converging the stack and the functionality
> > here.
> > Meaning to try and use the same sinks in the Table API as for the
> > DataStream API, and using the same sink for batch and streaming.
> >
> > The StreamingFileSink has a lot of things that can help with that. If
> > possible, it would be nice to extend it (which would help move towards
> the
> > above goal) rather than build a second sink. Building a second sink leads
> > us further away from unification.
> >
> > I am a bit puzzled by the statement that sinks are primarily for Hive.
> The
> > Table API should not be coupled to Hive, it should be an independent
> > batch/streaming API for many use cases, supporting very well for batch
> and
> > streaming interplay. Supporting Hive is great, but we should not be
> > building this towards Hive, as just yet another Hive runtime. Why "yet
> > another Hive runtime" when what we have a unique streaming engine that
> can
> > do much more? We would drop our own strength and reduce ourselves to a
> > limited subset.
> >
> > Let's build a File Sink that can also support Hive, but can do so much
> > more. For example, efficient streaming file ingestion as materialized
> views
> > from changelogs.
> >
> >
> > *## Writing Files in Streaming*
> >
> > To write files in streaming, I don't see another way than using the
> > streaming file sink. If you want to write files across checkpoints,
> support
> > exactly-once, and support consistent "stop with savepoint", it is not
> > trivial.
> >
> > A part of the complexity comes from the fact that not all targets are
> > actually file systems, and not all have simple semantics for persistence.
> > S3 for example does not support renames (only copies, which may take a
> lot
> > of time) and it does not support flush/sync of data (the S3 file system
> in
> > Hadoop exposes that but it does not work. flush/sync, followed by a
> > failure, leads to data loss). You need to devise a separate protocol for
> > that, which is exactly what has already been done and abstracted behind
> the
> > recoverable writers.
> >
> > If you re-engineer that in the, you will end up either missing many
> things
> > (intermediate persistence on different file systems, and atomic commit in
> > the absence of renames, etc.), or you end up doing something similar as
> the
> > recoverable writers do.
> >
> >
> > *## Atomic Commit in Batch*
> >
> > For batch sinks, it is also desirable to write the data first and then
> > atomically commit it once the job is done.
> > Hadoop has spent a lot of time making this work, see this doc here,
> > specifically the section on 'The "Magic" Committer'. [1]
> >
> > What Flink has built in the RecoverableWriter is in some way an even
> better
> > version of this, because it works without extra files (we pass data
> through
> > checkpoint state) and it supports not only committing once at the end,
> but
> > committing multiple time intermediate parts during checkpoints.
> >
> > Meaning using the recoverable writer mechanism in batch would allow us to
> > immediately get the efficient atomic commit implementations on file://
> > hdfs:// and s3://, with a well defined way to implement it also for other
> > file systems.
> >
> >
> > *## Batch / Streaming Unification*
> >
> > It would be great to start looking at these things in the same way:
> >   - streaming (exactly-once): commits files (after finished) at the next
> > checkpoint
> >   - batch: single commit at the end of the job
> >
> >
> > *## DataStream / Table API Stack Unification*
> >
> > Having the same set of capabilities would make it much easier for users
> to
> > understand the system.
> > Especially when it comes to consistent behavior across external systems.
> > Having a different file sink in Table API and DataStream API means that
> > DataStream can write correctly to S3 while Table API cannot.
> >
> >
> > *## What is missing?*
> >
> > It seems there are some things that get in the way of naturally
> > Can you make a list of what features are missing in the StreamingFileSink
> > that make it usable for the use cases you have in mind?
> >
> > Best,
> > Stephan
> >
> > [1]
> >
> >
> https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/committer_architecture.html
> >
> >
> > On Mon, Mar 16, 2020 at 12:31 PM Jingsong Li <jingsongl...@gmail.com>
> > wrote:
> >
> > > Hi Piotr,
> > >
> > > I am very entangled.
> > >
> > > Let me re-list the table streaming sink requirements:
> > > - In table, maybe 90% sinks are for Hive. The parquet and orc are the
> > most
> > > important formats. Hive provide RecordWriters, it is easy to support
> all
> > > hive formats by using it, and we don't need concern hive version
> > > compatibility too, but it can not work with FSDataOutputStream.
> > > - Hive table maybe use external HDFS. It means, hive has its own hadoop
> > > configuration.
> > > - In table, partition commit is needed, we can not just move files, it
> is
> > > important to complete table semantics to update catalog.
> > >
> > > You are right DataStream and Table streaming sink will not be fully
> > > compatible, each with its own set of limitations, quirks and features.
> > > But if re-using DataStream, batch and streaming also will not be fully
> > > compatible. Provide a unify experience to batch and streaming is also
> > > important.
> > >
> > > Table and DataStream have different concerns, and they tilt in
> different
> > > directions.
> > >
> > > Of course, it is very good to see a unify implementation to solve batch
> > > sink and hive things, unify DataStream batch sink and DataStream
> > streaming
> > > sink and Table batch sink and Table streaming sink.
> > >
> > > Le's see what others think.
> > >
> > > Best,
> > > Jingsong Lee
> > >
> > >
> > > On Mon, Mar 16, 2020 at 4:15 PM Piotr Nowojski <pi...@ververica.com>
> > > wrote:
> > >
> > > > Hi Jingsong,
> > > >
> > > > > First way is reusing Batch sink in FLINK-14254, It has handled the
> > > > partition and metastore logic well.
> > > > > - unify batch and streaming
> > > > > - Using FileOutputFormat is consistent with FileInputFormat.
> > > > > - Add exactly-once related logic. Just 200+ lines code.
> > > > > - It's natural to support more table features, like partition
> commit,
> > > > auto compact and etc..
> > > > >
> > > > > Second way is reusing Datastream StreamingFileSink:
> > > > > - unify streaming sink between table and Datastream.
> > > > > - It maybe hard to introduce table related features to
> > > StreamingFileSink.
> > > > >
> > > > > I prefer the first way a little. What do you think?
> > > >
> > > > I would be surprised if adding “exactly-once related logic” is just
> 200
> > > > lines of code. There are things like multi part file upload to s3 and
> > > there
> > > > are also some pending features like [1]. I would suggest to
> ask/involve
> > > > Klou in this discussion.
> > > >
> > > > If it’s as easy to support exactly-once streaming with current batch
> > > sink,
> > > > that begs the question, why do we need to maintain StreamingFileSink?
> > > >
> > > > The worst possible outcome from my perspective will be, if we have
> > > another
> > > > example of an operator/logic implemented independently both in
> > DataStream
> > > > API and Table API. Because I’m pretty sure they will not be fully
> > > > compatible, each with it’s own set of limitations, quirks and
> features.
> > > > Especially that we have on our long term roadmap and wish list to
> unify
> > > > such kind of operators.
> > > >
> > > > Piotrek
> > > >
> > > > [1] https://issues.apache.org/jira/browse/FLINK-11499 <
> > > > https://issues.apache.org/jira/browse/FLINK-11499>
> > > >
> > > > > On 16 Mar 2020, at 06:55, Jingsong Li <jingsongl...@gmail.com>
> > wrote:
> > > > >
> > > > > Thanks Jinhai for involving.
> > > > >
> > > > >> we need add 'connector.sink.username' for UserGroupInformation
> when
> > > data
> > > > > is written to HDFS
> > > > >
> > > > > Yes, I am not an expert of HDFS, but it seems we need do this
> "doAs"
> > in
> > > > the
> > > > > code for access external HDFS. I will update document.
> > > > >
> > > > > Best,
> > > > > Jingsong Lee
> > > > >
> > > > > On Mon, Mar 16, 2020 at 12:01 PM Jingsong Li <
> jingsongl...@gmail.com
> > >
> > > > wrote:
> > > > >
> > > > >> Thanks Piotr and Yun for involving.
> > > > >>
> > > > >> Hi Piotr and Yun, for implementation,
> > > > >>
> > > > >> FLINK-14254 [1] introduce batch sink table world, it deals with
> > > > partitions
> > > > >> thing, metastore thing and etc.. And it just reuse
> > Dataset/Datastream
> > > > >> FileInputFormat and FileOutputFormat. Filesystem can not do
> without
> > > > >> FileInputFormat, because it need deal with file things, split
> > things.
> > > > Like
> > > > >> orc and parquet, they need read whole file and have different
> split
> > > > logic.
> > > > >>
> > > > >> So back to file system connector:
> > > > >> - It needs introducing FilesystemTableFactory,
> FilesystemTableSource
> > > and
> > > > >> FilesystemTableSink.
> > > > >> - For sources, reusing Dataset/Datastream FileInputFormats, there
> > are
> > > no
> > > > >> other interface to finish file reading.
> > > > >>
> > > > >> For file sinks:
> > > > >> - Batch sink use FLINK-14254
> > > > >> - Streaming sink has two ways.
> > > > >>
> > > > >> First way is reusing Batch sink in FLINK-14254, It has handled the
> > > > >> partition and metastore logic well.
> > > > >> - unify batch and streaming
> > > > >> - Using FileOutputFormat is consistent with FileInputFormat.
> > > > >> - Add exactly-once related logic. Just 200+ lines code.
> > > > >> - It's natural to support more table features, like partition
> > commit,
> > > > auto
> > > > >> compact and etc..
> > > > >>
> > > > >> Second way is reusing Datastream StreamingFileSink:
> > > > >> - unify streaming sink between table and Datastream.
> > > > >> - It maybe hard to introduce table related features to
> > > > StreamingFileSink.
> > > > >>
> > > > >> I prefer the first way a little. What do you think?
> > > > >>
> > > > >> Hi Yun,
> > > > >>
> > > > >>> Watermark mechanism might not be enough.
> > > > >>
> > > > >> Watermarks of subtasks are the same in the "snapshotState".
> > > > >>
> > > > >>> we might need to also do some coordination between subtasks.
> > > > >>
> > > > >> Yes, JobMaster is the role to control subtasks. Metastore is a
> very
> > > > >> fragile single point, which can not be accessed by distributed, so
> > it
> > > is
> > > > >> uniformly accessed by JobMaster.
> > > > >>
> > > > >> [1]https://issues.apache.org/jira/browse/FLINK-14254
> > > > >>
> > > > >> Best,
> > > > >> Jingsong Lee
> > > > >>
> > > > >> On Fri, Mar 13, 2020 at 6:43 PM Yun Gao <yungao...@aliyun.com>
> > wrote:
> > > > >>
> > > > >>>       Hi,
> > > > >>>
> > > > >>>       Very thanks for Jinsong to bring up this discussion! It
> > should
> > > > >>> largely improve the usability after enhancing the FileSystem
> > > connector
> > > > in
> > > > >>> Table.
> > > > >>>
> > > > >>>       I have the same question with Piotr. From my side, I think
> it
> > > > >>> should be better to be able to reuse existing StreamingFileSink.
> I
> > > > think We
> > > > >>> have began
> > > > >>>       enhancing the supported FileFormat (e.g., ORC, Avro...),
> and
> > > > >>> reusing StreamFileSink should be able to avoid repeat work in the
> > > Table
> > > > >>> library. Besides,
> > > > >>>       the bucket concept seems also matches the semantics of
> > > partition.
> > > > >>>
> > > > >>>       For the notification of adding partitions, I'm a little
> > > wondering
> > > > >>> that the Watermark mechanism might not be enough since
> > > Bucket/Partition
> > > > >>> might spans
> > > > >>>       multiple subtasks. It depends on the level of notification:
> > if
> > > we
> > > > >>> want to notify for the bucket on each subtask, using watermark to
> > > > notifying
> > > > >>> each subtask
> > > > >>>       should be ok, but if we want to notifying for the whole
> > > > >>> Bucket/Partition, we might need to also do some coordination
> > between
> > > > >>> subtasks.
> > > > >>>
> > > > >>>
> > > > >>>     Best,
> > > > >>>      Yun
> > > > >>>
> > > > >>>
> > > > >>>
> > > > >>>
> ------------------------------------------------------------------
> > > > >>> From:Piotr Nowojski <pi...@ververica.com>
> > > > >>> Send Time:2020 Mar. 13 (Fri.) 18:03
> > > > >>> To:dev <dev@flink.apache.org>
> > > > >>> Cc:user <u...@flink.apache.org>; user-zh <
> user...@flink.apache.org
> > >
> > > > >>> Subject:Re: [DISCUSS] FLIP-115: Filesystem connector in Table
> > > > >>>
> > > > >>> Hi,
> > > > >>>
> > > > >>>
> > > > >>> Which actual sinks/sources are you planning to use in this
> feature?
> > > Is
> > > > it about exposing StreamingFileSink in the Table API? Or do you want
> to
> > > > implement new Sinks/Sources?
> > > > >>>
> > > > >>> Piotrek
> > > > >>>
> > > > >>>> On 13 Mar 2020, at 10:04, jinhai wang <jinhai...@gmail.com>
> > wrote:
> > > > >>>>
> > > > >>>
> > > > >>>> Thanks for FLIP-115. It is really useful feature for platform
> > > > developers who manage hundreds of Flink to Hive jobs in production.
> > > > >>>
> > > > >>>> I think we need add 'connector.sink.username' for
> > > > UserGroupInformation when data is written to HDFS
> > > > >>>>
> > > > >>>>
> > > > >>>> 在 2020/3/13 下午3:33,“Jingsong Li”<jingsongl...@gmail.com> 写入:
> > > > >>>>
> > > > >>>>   Hi everyone,
> > > > >>>>
> > > > >>>
> > > > >>>>   I'd like to start a discussion about FLIP-115 Filesystem
> > connector
> > > > in Table
> > > > >>>>   [1].
> > > > >>>>   This FLIP will bring:
> > > > >>>>   - Introduce Filesystem table factory in table, support
> > > > >>>>   csv/parquet/orc/json/avro formats.
> > > > >>>>   - Introduce streaming filesystem/hive sink in table
> > > > >>>>
> > > > >>>
> > > > >>>>   CC to user mail list, if you have any unmet needs, please feel
> > > free
> > > > to
> > > > >>>>   reply~
> > > > >>>>
> > > > >>>>   Look forward to hearing from you.
> > > > >>>>
> > > > >>>>   [1]
> > > > >>>>
> > > > >>>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table
> > > > >>>>
> > > > >>>>   Best,
> > > > >>>>   Jingsong Lee
> > > > >>>>
> > > > >>>>
> > > > >>>>
> > > > >>>
> > > > >>>
> > > > >>>
> > > > >>
> > > > >> --
> > > > >> Best, Jingsong Lee
> > > > >>
> > > > >
> > > > >
> > > > > --
> > > > > Best, Jingsong Lee
> > > >
> > > >
> > >
> > > --
> > > Best, Jingsong Lee
> > >
> >
>


-- 
Best, Jingsong Lee

Reply via email to