Hi Jingsong & Guojun
Users can config SINK_MANAGED_WRITER_BUFFER_WEIGHT in sql job, for example
`INSERT INTO paimon_table
/*+ OPTIONS('sink.use-managed-memory-allocator'='true',
'sink.managed.writer-buffer-weight'='256') */ SELECT ... FROM ...;`
Flink will calculate the memory of Managed Memory based on the weights of
70, 70, and 30 for the Operator, Statebackend, and Python according to the
requirements. After that, Flink normalizes the weight of each specific
operator and recalculates the operator's memory from the Operator Managed
Memory.
Users can config agg/sort operator weight for Flink Batch jobs with options
`table.exec.resource.hash-agg.memory`,
`table.exec.resource.hash-join.memory` and
`table.exec.resource.sort.memory`, the default weights for them are 128.
For window operators in Flink Streaming jobs, there are const weights with
value 100.
Compared to these operators, I think it's sufficient to use default weight
256 for sink operator. If the writer buffer is out of memory, users need to
increase the managed memory size. We can clearly describe it in the
document.
Best,
Shammon FY
On Tue, Apr 25, 2023 at 12:22 PM Jingsong Li <[email protected]> wrote:
> Looks good to me!
>
> On Tue, Apr 25, 2023 at 12:12 PM Shammon FY <[email protected]> wrote:
> >
> > Hi Jingsong
> >
> > I agree with you that from a performance perspective there is indeed no
> > need to create segments for Paimon based on Flink segments. If we convert
> > Flink MemorySegment to Paimon directly, I think we should remove the
> `free`
> > method in Paimon's `MemorySegment`:
> > 1. As you mentioned above, Paimon won't free segment
> > 2. Avoid users mistakenly calling free method to duplicate release
> off-heap
> > memory
> >
> > In addition, we need to declare in the Paimon's MemorySegment that
> off-heap
> > memory needs to be allocated and released by the engine. What do you
> think?
> >
> > Best,
> > Shammon FY
> >
> >
> > On Tue, Apr 25, 2023 at 9:53 AM Jingsong Li <[email protected]>
> wrote:
> >
> > > About Guojun's concerns,
> > >
> > > How to configure SINK_MANAGED_WRITER_BUFFER_WEIGHT?
> > > How does it allocate memory with sort and agg? What is the best value
> to
> > > assign?
> > >
> > > > users can configure managed memory weights for AGG and window
> operators
> > > for Flink jobs
> > >
> > > How to?
> > >
> > > I think we can add more explanation.
> > >
> > > Best,
> > > Jingsong
> > >
> > > On Tue, Apr 25, 2023 at 9:46 AM Jingsong Li <[email protected]>
> > > wrote:
> > > >
> > > > Hi Shammon,
> > > >
> > > > But nobody released the Paimon MemorySegment.
> > > >
> > > > I think we can have a clear definition here, Flink's memory is
> managed
> > > by Flink.
> > > >
> > > > The introduction of interface here has a big impact on performance,
> > > > and Flink did a lot of testing and optimization early on to avoid
> > > > interface invoking as much as possible.
> > > >
> > > > Best,
> > > > Jingsong
> > > >
> > > > On Sun, Apr 23, 2023 at 5:51 PM Shammon FY <[email protected]>
> wrote:
> > > > >
> > > > > Thanks for all the feedbacks.
> > > > >
> > > > > To Jingsong
> > > > > > Maybe we can just use some reflection method to get
> offHeapBuffer and
> > > > > heapMemory from Flink MemorySegment.
> > > > >
> > > > > Yes, we can indeed construct a MemorySegment for Paimon in this
> way,
> > > but
> > > > > this method may have duplicate release issues for the segment.
> Assuming
> > > > > Flink has applied for an off-heap memory, Paimon gets the off-heap
> > > buffer
> > > > > and creates its MemorySegment, then the off-heap buffer will be in
> > > Flink
> > > > > MemorySegment and Paimon MemorySegment. When the off-heap buffer is
> > > > > released in Paimon with `UNSAFE.freeMemory(this.address)`, it may
> be
> > > > > released again in Flink's MemorySegment.
> > > > >
> > > > > To Liming
> > > > > > Is it possible to cause a deadlock when requesting for memory
> from
> > > the
> > > > > engine's managed memory? Is it necessary to add some memory
> checking or
> > > > > timeout mechanism here?
> > > > >
> > > > > Flink allocates segments for parallel tasks in MemoryManager. When
> the
> > > > > usage of memory in MemoryManager hits the limit, it will throw
> > > Exception
> > > > >
> > > > > To Guojun
> > > > > > One question I'm thinking about is that will this increase the
> bar of
> > > > > writing performance maintenance on Paimon? Like how to decide an
> > > > > appropriate memory weight for users' writing jobs.
> > > > >
> > > > > Currently, users can configure managed memory weights for AGG and
> > > window
> > > > > operators for Flink jobs, this is similar to the writer buffer pool
> > > weight
> > > > > configured in Paimon. So for Flink users, I think this will not be
> a
> > > > > problem.
> > > > >
> > > > > Best,
> > > > > Shammon FY
> > > > >
> > > > >
> > > > > On Fri, Apr 21, 2023 at 11:42 AM Guojun Li <
> [email protected]>
> > > wrote:
> > > > >
> > > > > > Hi Shammon,
> > > > > >
> > > > > > Thank you for writing up the proposal. It's great to introduce
> this
> > > unified
> > > > > > memory management for Paimon!
> > > > > >
> > > > > > One question I'm thinking about is that will this increase the
> bar of
> > > > > > writing performance maintenance on Paimon? Like how to decide an
> > > > > > appropriate memory weight for users' writing jobs.
> > > > > >
> > > > > > Thanks,
> > > > > > Guojun
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Thu, Apr 20, 2023 at 8:45 PM Ming Li <[email protected]>
> > > wrote:
> > > > > >
> > > > > > > Thanks Shammon for the proposal.
> > > > > > >
> > > > > > > For me it is more appropriate to leave the memory management
> to the
> > > > > > > computing engine.
> > > > > > >
> > > > > > > But I have a small question about this proposal. If the
> engine's
> > > memory
> > > > > > is
> > > > > > > not configured properly, is it possible to cause a deadlock
> when
> > > > > > requesting
> > > > > > > for memory from the engine's managed memory? Is it necessary to
> > > add some
> > > > > > > memory checking or timeout mechanism here?
> > > > > > >
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Ming Li
> > > > > > >
> > > > > > >
> > > > > > > Shammon FY <[email protected]> 于2023年4月19日周三 09:57写道:
> > > > > > >
> > > > > > > > Hi devs:
> > > > > > > >
> > > > > > > > I would like to start a discussion of PIP-1: Improve Shared
> > > Writer
> > > > > > Buffer
> > > > > > > > Pool For Sink [1]. Currently Paimon sink task creates a heap
> > > memory
> > > > > > pool
> > > > > > > > which is shared by writers. When there are multiple tasks in
> an
> > > > > > Executor,
> > > > > > > > it may cause FullGC, performance issues and even OOM.
> > > > > > > >
> > > > > > > > This PIP aims to improve the buffer pool for writers in
> Paimon
> > > tasks.
> > > > > > > > Paimon tasks can create memory pools based on Executor Memory
> > > which
> > > > > > will
> > > > > > > be
> > > > > > > > managed by Executor, such as Managed Memory in Flink
> > > TaskManager. It
> > > > > > will
> > > > > > > > improve the stability and performance of sinks by managing
> writer
> > > > > > buffers
> > > > > > > > for multiple tasks through Executor.
> > > > > > > >
> > > > > > > > Looking forward to your feedback, thanks.
> > > > > > > >
> > > > > > > >
> > > > > > > > [1]
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > >
> https://cwiki.apache.org/confluence/display/PAIMON/PIP-1%3A+Improve+Shared+Writer+Buffer+Pool+For+Sink
> > > > > > > >
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Shammon FY
> > > > > > > >
> > > > > > >
> > > > > >
> > >
>