This is good news Yangze. Decreasing the size of our id's is a really nice
side effect :-)

Hence, +1 from my side as well.

Cheers,
Till

On Tue, Apr 14, 2020 at 9:54 AM Zhu Zhu <reed...@gmail.com> wrote:

> Thanks for doing this benchmark @Yangze Guo <karma...@gmail.com> .
> The result looks promising. So I would +1 to refactor ExecutionAttemptID
> and IntermediateResultPartitionID.
>
> Regarding why 'The size of TDD after serialization become smaller than
> before',  I guess it's because the new IntermediateResultPartitionIDs can
> share the same IntermediateDataSetID, in this way the space of
> IntermediateResultPartitionID is a ref (to IntermediateDataSetID) and an
> int (index), which is smaller than 2 Longs (AbstractID).
>
> Thanks,
> Zhu Zhu
>
> Yangze Guo <karma...@gmail.com> 于2020年4月14日周二 下午3:09写道:
>
> > Hi everyone,
> >
> > I've investigated the infect with higher parallelism jobs.
> >
> > The result shows:
> > - The size of TDD after serialization become smaller than before.
> > While I did not meet any issue with Akka framework when the
> > parallelism set to 6000.
> > - There was no significant difference regarding the end to end
> > schedule time, job runtime, young gc count and total full gc time.
> >
> > For details, please take a look at
> >
> >
> https://docs.google.com/spreadsheets/d/13lt6J29uikWoux79047lkvbGi2fS4ioWHSbvT1kTL2M/edit?usp=sharing
> > .
> >
> > From my perspective, I think it might be ok to refactor
> > ExecutionAttemptID and IntermediateResultPartitionID. If you have
> > further concerns or think we should make further investigation. Please
> > let me know.
> >
> > Best,
> > Yangze Guo
> >
> > On Mon, Apr 13, 2020 at 1:36 PM Yangze Guo <karma...@gmail.com> wrote:
> > >
> > > Hi everyone,
> > > After an offline discussion with ZhuZhu, I have some comments on this
> > > investigation.
> > >
> > > Regarding the maximum parallelism went from 760 to 685, it may because
> > > of that the tasks are not scheduled evenly. The result is inconsistent
> > > in multiple experiments. So, this phenomenon would be irrelevant to
> > > our changes.
> > >
> > > I think what we really care about is the framesize for Akka(Should
> > > user enlarge it after our change for the same job). The size of TDD
> > > after serialization seems to be smaller after change. I don't know the
> > > root reason of this phenomenon at the moment. The way I measure it is:
> > > ```
> > > ByteArrayOutputStream bos = new ByteArrayOutputStream();
> > > ObjectOutputStream oos = new ObjectOutputStream(bos);
> > > oos.writeObject(deployment);
> > > oos.flush();
> > > LOG.info("BENCHMARK TDD_SERIAL_SIZE {}.", bos.toByteArray().length);
> > > ```
> > > Please correct me if I'm wrong.
> > >
> > > I'll experiment with higher parallelism to see if there is any
> > > regression regarding Akka framesize.
> > >
> > > Regarding the TDD building time, the parallelism in my investigation
> > > might be too small. Meanwhile, this time might be influence by many
> > > factors. Thus, I'll
> > > - experiment with higher parallelism.
> > > - measure the time spent from "Starting scheduling" to the last task
> > > change state to running.
> > >
> > > Best,
> > > Yangze Guo
> > >
> > >
> > > On Fri, Apr 10, 2020 at 12:53 PM Yangze Guo <karma...@gmail.com>
> wrote:
> > > >
> > > > Hi there,
> > > >
> > > > Sorry for the belated reply. I just make a preliminary investigation
> > > > of the infect of refactoring IntermediateResultPartitionID.
> > > >
> > > > The key change is making it being composed of IntermediateDataSetID
> > > > and a partitionNum.
> > > > public class IntermediateResultPartitionID {
> > > >    private final IntermediateDataSetID intermediateDataSetID;
> > > >    private final int partitionNum;
> > > > }
> > > >
> > > > In this benchmark, I use examples/streaming/WordCount.jar as the test
> > > > job and run Flink on Yarn. All the configurations are kept default
> > > > except for "taskmanager.numberOfTaskSlots", which is set to 2.
> > > >
> > > > The result shows it does have an impact on performance.
> > > > - After this change, the maximum parallelism went from 760 to 685,
> > > > which limited by the total number of network buffers. For the same
> > > > parallelism, user needs more network buffer than before.
> > > > - The netty message "PartitionRequest" and "TaskEventRequest"
> increase
> > > > by 4 bytes. For "PartitionRequest", it means 7% increase.
> > > > - The TDD takes longer to construct. With 600 parallelisms, it takes
> > > > twice as long to construct TDD than before.
> > > >
> > > > Details record in
> > > >
> >
> https://docs.google.com/spreadsheets/d/13lt6J29uikWoux79047lkvbGi2fS4ioWHSbvT1kTL2M/edit?usp=sharing
> > > >
> > > > The same issue could happen in ExecutionAttemptID, which will
> increase
> > > > the "PartitionRequest" and "TaskEventRequest" by 8 bytes(subtaskIndex
> > > > and attemptNumber). But it may not infect the TDD as much as
> > > > IntermediateResultPartitionID, since there is only one
> > > > ExecutionAttemptID per TDD.
> > > >
> > > > After that preliminary investigation, I tend to not refactor
> > > > ExecutionAttemptID and IntermediateResultPartitionID at the moment or
> > > > treat it as future work.
> > > >
> > > > WDYT? @ZhuZhu @Till
> > > >
> > > > Best,
> > > > Yangze Guo
> > > >
> > > > On Wed, Apr 1, 2020 at 11:53 AM Zhu Zhu <reed...@gmail.com> wrote:
> > > > >
> > > > > >> However, it seems the JobVertexID is derived from hashcode ...
> > > > > You are right. JobVertexID is widely used and reworking it may
> > affect the
> > > > > public
> > > > > interfaces, e.g. REST api. We can take it as a long term goal and
> > exclude
> > > > > it from this FLIP.
> > > > > This same applies to IntermediateDataSetID, which can be also
> > composed of a
> > > > > JobID
> > > > > and an index as Till proposed.
> > > > >
> > > > > Thanks,
> > > > > Zhu Zhu
> > > > >
> > > > > Till Rohrmann <trohrm...@apache.org> 于2020年3月31日周二 下午8:36写道:
> > > > >
> > > > > > For the IntermediateDataSetID I was just thinking that it might
> > actually be
> > > > > > interesting to know which job produced the result which, by using
> > cluster
> > > > > > partitions, could be used across different jobs. Not saying that
> > we have to
> > > > > > do it, though.
> > > > > >
> > > > > > A small addition to Zhu Zhu's comment about TDD sizes: For the
> > problem with
> > > > > > too large TDDs there is already an issue [1]. The current
> > suspicion is that
> > > > > > the size of TDDs for jobs with a large parallelism can indeed
> > become
> > > > > > problematic for Flink. Hence, it would be great to investigate
> the
> > impacts
> > > > > > of the proposed changes.
> > > > > >
> > > > > > [1] https://issues.apache.org/jira/browse/FLINK-16069
> > > > > >
> > > > > > Cheers,
> > > > > > Till
> > > > > >
> > > > > > On Tue, Mar 31, 2020 at 11:50 AM Yangze Guo <karma...@gmail.com>
> > wrote:
> > > > > >
> > > > > > > Hi, Zhu,
> > > > > > >
> > > > > > > Thanks for the feedback.
> > > > > > >
> > > > > > > > make JobVertexID a composition of JobID and a topology index
> > > > > > > I think it is a good idea. However, it seems the JobVertexID is
> > > > > > > derived from hashcode which used to identify them across
> > submission.
> > > > > > > I'm not familiar with that component though. I prefer to keep
> > this
> > > > > > > idea out the scope of this FLIP if no one could help us to
> > figure it
> > > > > > > out.
> > > > > > >
> > > > > > > > How about we still keep IntermediateDataSetID independent
> from
> > > > > > > JobVertexID,
> > > > > > > > but just print the producing relationships in logs? I think
> > keeping
> > > > > > > > IntermediateDataSetID independent may be better considering
> > the cross
> > > > > > job
> > > > > > > > result usages in interactive query cases.
> > > > > > > I think you are right. I'll keep IntermediateDataSetID
> > independent
> > > > > > > from JobVertexID.
> > > > > > >
> > > > > > > > The new IDs will become larger with this rework.
> > > > > > > Yes, I also have the same concern. Benchmark is necessary, I'll
> > try to
> > > > > > > provide one during the implementation phase.
> > > > > > >
> > > > > > >
> > > > > > > Best,
> > > > > > > Yangze Guo
> > > > > > >
> > > > > > > On Tue, Mar 31, 2020 at 4:55 PM Zhu Zhu <reed...@gmail.com>
> > wrote:
> > > > > > > >
> > > > > > > > Thanks for proposing this improvement Yangze. Big +1 for the
> > overall
> > > > > > > > proposal. It can help a lot in troubleshooting.
> > > > > > > >
> > > > > > > > Here are a few questions for it:
> > > > > > > > 1. Shall we make JobVertexID a composition of JobID and a
> > topology
> > > > > > index?
> > > > > > > > This would help in the session cluster case, so that we can
> > identify
> > > > > > > which
> > > > > > > > tasks are from which jobs along with the rework of
> > ExecutionAttemptID.
> > > > > > > >
> > > > > > > > 2. You mentioned that "Add the producer info to the string
> > literal of
> > > > > > > > IntermediateDataSetID". Do you mean to make
> > IntermediateDataSetID a
> > > > > > > > composition of JobVertexID and a consumer index?
> > > > > > > > How about we still keep IntermediateDataSetID independent
> from
> > > > > > > JobVertexID,
> > > > > > > > but just print the producing relationships in logs? I think
> > keeping
> > > > > > > > IntermediateDataSetID independent may be better considering
> > the cross
> > > > > > job
> > > > > > > > result usages in interactive query cases.
> > > > > > > >
> > > > > > > > 3. The new IDs will become larger with this rework. The
> > > > > > > > TaskDeploymentDescriptor can become much larger since it is
> > mainly
> > > > > > > composed
> > > > > > > > of a variety DIs. I'm not sure how much it would be but there
> > can be
> > > > > > more
> > > > > > > > memory and CPU cost for it, and results in more frequent GCs,
> > message
> > > > > > > size
> > > > > > > > exceeding akka frame limits, and a longer blocked time of
> main
> > thread.
> > > > > > > > This should not be a problem in most cases but might be a
> > problem for
> > > > > > > large
> > > > > > > > scale jobs. Shall we have an benchmark for it?
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Zhu Zhu
> > > > > > > >
> > > > > > > > Yangze Guo <karma...@gmail.com> 于2020年3月31日周二 下午2:19写道:
> > > > > > > >
> > > > > > > > > Thank you all for the feedback! Sorry for the belated
> reply.
> > > > > > > > >
> > > > > > > > > @Till
> > > > > > > > > I'm +1 for your two ideas and I'd like to move these two
> out
> > of the
> > > > > > > > > scope of this FLIP since the pipelined region scheduling is
> > an
> > > > > > ongoing
> > > > > > > > > work now.
> > > > > > > > > I also agree that we should not make the InstanceID in
> > > > > > > > > TaskExecutorConnection being composed of the ResourceID
> plus
> > a
> > > > > > > > > monotonically increasing value. Thanks a lot for your
> > explanation.
> > > > > > > > >
> > > > > > > > > @Konstantin @Yang
> > > > > > > > > Regarding the PodName of TaskExecutor on K8s, I second
> Yang's
> > > > > > > > > suggestion. It makes sense to me to let user export
> > RESOURCE_ID and
> > > > > > > > > make TM respect it. User needs to guarantee there is no
> > collision for
> > > > > > > > > different TM.
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > Yangze Guo
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Tue, Mar 31, 2020 at 12:25 AM Steven Wu <
> > stevenz...@gmail.com>
> > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > +1 on allowing user defined resourceId for taskmanager
> > > > > > > > > >
> > > > > > > > > > On Sun, Mar 29, 2020 at 7:24 PM Yang Wang <
> > danrtsey...@gmail.com>
> > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi Konstantin,
> > > > > > > > > > >
> > > > > > > > > > > I think it is a good idea. Currently, our users also
> > report a
> > > > > > > similar
> > > > > > > > > issue
> > > > > > > > > > > with
> > > > > > > > > > > resourceId of standalone cluster. When we start a
> > standalone
> > > > > > > cluster
> > > > > > > > > now,
> > > > > > > > > > > the `TaskManagerRunner` always generates a uuid for the
> > > > > > > resourceId. It
> > > > > > > > > will
> > > > > > > > > > > be used to register to the jobmanager and not
> convenient
> > to match
> > > > > > > with
> > > > > > > > > the
> > > > > > > > > > > real
> > > > > > > > > > > taskmanager, especially in container environment.
> > > > > > > > > > >
> > > > > > > > > > > I think a probably solution is we could support the
> user
> > defined
> > > > > > > > > > > resourceId.
> > > > > > > > > > > We could get it from the environment. For standalone on
> > K8s, we
> > > > > > > could
> > > > > > > > > set
> > > > > > > > > > > the "RESOURCE_ID" env to the pod name so that it is
> > easier to
> > > > > > > match the
> > > > > > > > > > > taskmanager with K8s pod.
> > > > > > > > > > >
> > > > > > > > > > > Moreover, i am afraid we could not set the pod name to
> > the
> > > > > > > resourceId.
> > > > > > > > > I
> > > > > > > > > > > think
> > > > > > > > > > > you could set the "deployment.meta.name". Since the
> pod
> > name is
> > > > > > > > > generated
> > > > > > > > > > > by
> > > > > > > > > > > K8s in the pattern
> > {deployment.meta.nane}-{rc.uuid}-{uuid}. On
> > > > > > the
> > > > > > > > > > > contrary, we
> > > > > > > > > > > will set the resourceId to the pod name.
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > Best,
> > > > > > > > > > > Yang
> > > > > > > > > > >
> > > > > > > > > > > Konstantin Knauf <konstan...@ververica.com>
> > 于2020年3月29日周日
> > > > > > > 下午8:06写道:
> > > > > > > > > > >
> > > > > > > > > > > > Hi Yangze, Hi Till,
> > > > > > > > > > > >
> > > > > > > > > > > > thanks you for working on this topic. I believe it
> > will make
> > > > > > > > > debugging
> > > > > > > > > > > > large Apache Flink deployments much more feasible.
> > > > > > > > > > > >
> > > > > > > > > > > > I was wondering whether it would make sense to allow
> > the user
> > > > > > to
> > > > > > > > > specify
> > > > > > > > > > > > the Resource ID in standalone setups?  For example,
> > many users
> > > > > > > still
> > > > > > > > > > > > implicitly use standalone clusters on Kubernetes (the
> > native
> > > > > > > support
> > > > > > > > > is
> > > > > > > > > > > > still experimental) and in these cases it would be
> > interesting
> > > > > > to
> > > > > > > > > also
> > > > > > > > > > > set
> > > > > > > > > > > > the PodName as the ResourceID. What do you think?
> > > > > > > > > > > >
> > > > > > > > > > > > Cheers,
> > > > > > > > > > > >
> > > > > > > > > > > > Kosntantin
> > > > > > > > > > > >
> > > > > > > > > > > > On Thu, Mar 26, 2020 at 6:49 PM Till Rohrmann <
> > > > > > > trohrm...@apache.org>
> > > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Hi Yangze,
> > > > > > > > > > > > >
> > > > > > > > > > > > > thanks for creating this FLIP. I think it is a very
> > good
> > > > > > > > > improvement
> > > > > > > > > > > > > helping our users and ourselves understanding
> better
> > what's
> > > > > > > going
> > > > > > > > > on in
> > > > > > > > > > > > > Flink.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Creating the ResourceIDs with host information/pod
> > name is a
> > > > > > > good
> > > > > > > > > idea.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Also deriving ExecutionGraph IDs from their
> superset
> > ID is a
> > > > > > > good
> > > > > > > > > idea.
> > > > > > > > > > > > >
> > > > > > > > > > > > > The InstanceID is used for fencing purposes. I
> would
> > not make
> > > > > > > it a
> > > > > > > > > > > > > composition of the ResourceID + a monotonically
> > increasing
> > > > > > > number.
> > > > > > > > > The
> > > > > > > > > > > > > problem is that in case of a RM failure the
> > InstanceIDs would
> > > > > > > start
> > > > > > > > > > > from
> > > > > > > > > > > > 0
> > > > > > > > > > > > > again and this could lead to collisions.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Logging more information on how the different
> > runtime IDs are
> > > > > > > > > > > correlated
> > > > > > > > > > > > is
> > > > > > > > > > > > > also a good idea.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Two other ideas for simplifying the ids are the
> > following:
> > > > > > > > > > > > >
> > > > > > > > > > > > > * The SlotRequestID was introduced because the
> > SlotPool was a
> > > > > > > > > separate
> > > > > > > > > > > > > RpcEndpoint a while ago. With this no longer being
> > the case I
> > > > > > > > > think we
> > > > > > > > > > > > > could remove the SlotRequestID and replace it with
> > the
> > > > > > > > > AllocationID.
> > > > > > > > > > > > > * Instead of creating new SlotRequestIDs for multi
> > task slots
> > > > > > > one
> > > > > > > > > could
> > > > > > > > > > > > > derive them from the SlotRequestID used for
> > requesting the
> > > > > > > > > underlying
> > > > > > > > > > > > > AllocatedSlot.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Given that the slot sharing logic will most likely
> be
> > > > > > reworked
> > > > > > > > > with the
> > > > > > > > > > > > > pipelined region scheduling, we might be able to
> > resolve
> > > > > > these
> > > > > > > two
> > > > > > > > > > > points
> > > > > > > > > > > > > as part of the pipelined region scheduling effort.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Cheers,
> > > > > > > > > > > > > Till
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Thu, Mar 26, 2020 at 10:51 AM Yangze Guo <
> > > > > > > karma...@gmail.com>
> > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Hi everyone,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > We would like to start a discussion thread on
> > "FLIP-118:
> > > > > > > Improve
> > > > > > > > > > > > > > Flink’s ID system"[1].
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > This FLIP mainly discusses the following issues,
> > target to
> > > > > > > > > enhance
> > > > > > > > > > > the
> > > > > > > > > > > > > > readability of IDs in log and help user to debug
> > in case of
> > > > > > > > > failures:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > - Enhance the readability of the string literals
> > of IDs.
> > > > > > > Most of
> > > > > > > > > them
> > > > > > > > > > > > > > are hashcodes, e.g. ExecutionAttemptID, which do
> > not
> > > > > > provide
> > > > > > > much
> > > > > > > > > > > > > > meaningful information and are hard to recognize
> > and
> > > > > > compare
> > > > > > > for
> > > > > > > > > > > > > > users.
> > > > > > > > > > > > > > - Log the ID’s lineage information to make
> > debugging more
> > > > > > > > > convenient.
> > > > > > > > > > > > > > Currently, the log fails to always show the
> lineage
> > > > > > > information
> > > > > > > > > > > > > > between IDs. Finding out relationships between
> > entities
> > > > > > > > > identified by
> > > > > > > > > > > > > > given IDs is a common demand, e.g., slot of which
> > > > > > > AllocationID is
> > > > > > > > > > > > > > assigned to satisfy slot request of with
> > SlotRequestID.
> > > > > > > Absence
> > > > > > > > > of
> > > > > > > > > > > > > > such lineage information, it’s impossible to
> track
> > the end
> > > > > > > to end
> > > > > > > > > > > > > > lifecycle of an Execution or a Task now, which
> > makes
> > > > > > > debugging
> > > > > > > > > > > > > > difficult.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Key changes proposed in the FLIP are as follows:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > - Add location information to distributed
> > components
> > > > > > > > > > > > > > - Add topology information to graph components
> > > > > > > > > > > > > > - Log the ID’s lineage information
> > > > > > > > > > > > > > - Expose the identifier of distributing component
> > to user
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Please find more details in the FLIP wiki
> document
> > [1].
> > > > > > > Looking
> > > > > > > > > > > forward
> > > > > > > > > > > > > to
> > > > > > > > > > > > > > your feedbacks.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > [1]
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > >
> > > > > > >
> > > > > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148643521
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Best,
> > > > > > > > > > > > > > Yangze Guo
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > --
> > > > > > > > > > > >
> > > > > > > > > > > > Konstantin Knauf | Head of Product
> > > > > > > > > > > >
> > > > > > > > > > > > +49 160 91394525
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > Follow us @VervericaData Ververica <
> > https://www.ververica.com/
> > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > --
> > > > > > > > > > > >
> > > > > > > > > > > > Join Flink Forward <https://flink-forward.org/> -
> The
> > Apache
> > > > > > > Flink
> > > > > > > > > > > > Conference
> > > > > > > > > > > >
> > > > > > > > > > > > Stream Processing | Event Driven | Real Time
> > > > > > > > > > > >
> > > > > > > > > > > > --
> > > > > > > > > > > >
> > > > > > > > > > > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin,
> > Germany
> > > > > > > > > > > >
> > > > > > > > > > > > --
> > > > > > > > > > > > Ververica GmbH
> > > > > > > > > > > > Registered at Amtsgericht Charlottenburg: HRB 158244
> B
> > > > > > > > > > > > Managing Directors: Timothy Alexander Steinert, Yip
> > Park Tung
> > > > > > > Jason,
> > > > > > > > > Ji
> > > > > > > > > > > > (Tony) Cheng
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > >
> > > > > > >
> > > > > >
> >
>

Reply via email to