This is good news Yangze. Decreasing the size of our id's is a really nice side effect :-)
Hence, +1 from my side as well. Cheers, Till On Tue, Apr 14, 2020 at 9:54 AM Zhu Zhu <reed...@gmail.com> wrote: > Thanks for doing this benchmark @Yangze Guo <karma...@gmail.com> . > The result looks promising. So I would +1 to refactor ExecutionAttemptID > and IntermediateResultPartitionID. > > Regarding why 'The size of TDD after serialization become smaller than > before', I guess it's because the new IntermediateResultPartitionIDs can > share the same IntermediateDataSetID, in this way the space of > IntermediateResultPartitionID is a ref (to IntermediateDataSetID) and an > int (index), which is smaller than 2 Longs (AbstractID). > > Thanks, > Zhu Zhu > > Yangze Guo <karma...@gmail.com> 于2020年4月14日周二 下午3:09写道: > > > Hi everyone, > > > > I've investigated the infect with higher parallelism jobs. > > > > The result shows: > > - The size of TDD after serialization become smaller than before. > > While I did not meet any issue with Akka framework when the > > parallelism set to 6000. > > - There was no significant difference regarding the end to end > > schedule time, job runtime, young gc count and total full gc time. > > > > For details, please take a look at > > > > > https://docs.google.com/spreadsheets/d/13lt6J29uikWoux79047lkvbGi2fS4ioWHSbvT1kTL2M/edit?usp=sharing > > . > > > > From my perspective, I think it might be ok to refactor > > ExecutionAttemptID and IntermediateResultPartitionID. If you have > > further concerns or think we should make further investigation. Please > > let me know. > > > > Best, > > Yangze Guo > > > > On Mon, Apr 13, 2020 at 1:36 PM Yangze Guo <karma...@gmail.com> wrote: > > > > > > Hi everyone, > > > After an offline discussion with ZhuZhu, I have some comments on this > > > investigation. > > > > > > Regarding the maximum parallelism went from 760 to 685, it may because > > > of that the tasks are not scheduled evenly. The result is inconsistent > > > in multiple experiments. So, this phenomenon would be irrelevant to > > > our changes. > > > > > > I think what we really care about is the framesize for Akka(Should > > > user enlarge it after our change for the same job). The size of TDD > > > after serialization seems to be smaller after change. I don't know the > > > root reason of this phenomenon at the moment. The way I measure it is: > > > ``` > > > ByteArrayOutputStream bos = new ByteArrayOutputStream(); > > > ObjectOutputStream oos = new ObjectOutputStream(bos); > > > oos.writeObject(deployment); > > > oos.flush(); > > > LOG.info("BENCHMARK TDD_SERIAL_SIZE {}.", bos.toByteArray().length); > > > ``` > > > Please correct me if I'm wrong. > > > > > > I'll experiment with higher parallelism to see if there is any > > > regression regarding Akka framesize. > > > > > > Regarding the TDD building time, the parallelism in my investigation > > > might be too small. Meanwhile, this time might be influence by many > > > factors. Thus, I'll > > > - experiment with higher parallelism. > > > - measure the time spent from "Starting scheduling" to the last task > > > change state to running. > > > > > > Best, > > > Yangze Guo > > > > > > > > > On Fri, Apr 10, 2020 at 12:53 PM Yangze Guo <karma...@gmail.com> > wrote: > > > > > > > > Hi there, > > > > > > > > Sorry for the belated reply. I just make a preliminary investigation > > > > of the infect of refactoring IntermediateResultPartitionID. > > > > > > > > The key change is making it being composed of IntermediateDataSetID > > > > and a partitionNum. > > > > public class IntermediateResultPartitionID { > > > > private final IntermediateDataSetID intermediateDataSetID; > > > > private final int partitionNum; > > > > } > > > > > > > > In this benchmark, I use examples/streaming/WordCount.jar as the test > > > > job and run Flink on Yarn. All the configurations are kept default > > > > except for "taskmanager.numberOfTaskSlots", which is set to 2. > > > > > > > > The result shows it does have an impact on performance. > > > > - After this change, the maximum parallelism went from 760 to 685, > > > > which limited by the total number of network buffers. For the same > > > > parallelism, user needs more network buffer than before. > > > > - The netty message "PartitionRequest" and "TaskEventRequest" > increase > > > > by 4 bytes. For "PartitionRequest", it means 7% increase. > > > > - The TDD takes longer to construct. With 600 parallelisms, it takes > > > > twice as long to construct TDD than before. > > > > > > > > Details record in > > > > > > > https://docs.google.com/spreadsheets/d/13lt6J29uikWoux79047lkvbGi2fS4ioWHSbvT1kTL2M/edit?usp=sharing > > > > > > > > The same issue could happen in ExecutionAttemptID, which will > increase > > > > the "PartitionRequest" and "TaskEventRequest" by 8 bytes(subtaskIndex > > > > and attemptNumber). But it may not infect the TDD as much as > > > > IntermediateResultPartitionID, since there is only one > > > > ExecutionAttemptID per TDD. > > > > > > > > After that preliminary investigation, I tend to not refactor > > > > ExecutionAttemptID and IntermediateResultPartitionID at the moment or > > > > treat it as future work. > > > > > > > > WDYT? @ZhuZhu @Till > > > > > > > > Best, > > > > Yangze Guo > > > > > > > > On Wed, Apr 1, 2020 at 11:53 AM Zhu Zhu <reed...@gmail.com> wrote: > > > > > > > > > > >> However, it seems the JobVertexID is derived from hashcode ... > > > > > You are right. JobVertexID is widely used and reworking it may > > affect the > > > > > public > > > > > interfaces, e.g. REST api. We can take it as a long term goal and > > exclude > > > > > it from this FLIP. > > > > > This same applies to IntermediateDataSetID, which can be also > > composed of a > > > > > JobID > > > > > and an index as Till proposed. > > > > > > > > > > Thanks, > > > > > Zhu Zhu > > > > > > > > > > Till Rohrmann <trohrm...@apache.org> 于2020年3月31日周二 下午8:36写道: > > > > > > > > > > > For the IntermediateDataSetID I was just thinking that it might > > actually be > > > > > > interesting to know which job produced the result which, by using > > cluster > > > > > > partitions, could be used across different jobs. Not saying that > > we have to > > > > > > do it, though. > > > > > > > > > > > > A small addition to Zhu Zhu's comment about TDD sizes: For the > > problem with > > > > > > too large TDDs there is already an issue [1]. The current > > suspicion is that > > > > > > the size of TDDs for jobs with a large parallelism can indeed > > become > > > > > > problematic for Flink. Hence, it would be great to investigate > the > > impacts > > > > > > of the proposed changes. > > > > > > > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-16069 > > > > > > > > > > > > Cheers, > > > > > > Till > > > > > > > > > > > > On Tue, Mar 31, 2020 at 11:50 AM Yangze Guo <karma...@gmail.com> > > wrote: > > > > > > > > > > > > > Hi, Zhu, > > > > > > > > > > > > > > Thanks for the feedback. > > > > > > > > > > > > > > > make JobVertexID a composition of JobID and a topology index > > > > > > > I think it is a good idea. However, it seems the JobVertexID is > > > > > > > derived from hashcode which used to identify them across > > submission. > > > > > > > I'm not familiar with that component though. I prefer to keep > > this > > > > > > > idea out the scope of this FLIP if no one could help us to > > figure it > > > > > > > out. > > > > > > > > > > > > > > > How about we still keep IntermediateDataSetID independent > from > > > > > > > JobVertexID, > > > > > > > > but just print the producing relationships in logs? I think > > keeping > > > > > > > > IntermediateDataSetID independent may be better considering > > the cross > > > > > > job > > > > > > > > result usages in interactive query cases. > > > > > > > I think you are right. I'll keep IntermediateDataSetID > > independent > > > > > > > from JobVertexID. > > > > > > > > > > > > > > > The new IDs will become larger with this rework. > > > > > > > Yes, I also have the same concern. Benchmark is necessary, I'll > > try to > > > > > > > provide one during the implementation phase. > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > Yangze Guo > > > > > > > > > > > > > > On Tue, Mar 31, 2020 at 4:55 PM Zhu Zhu <reed...@gmail.com> > > wrote: > > > > > > > > > > > > > > > > Thanks for proposing this improvement Yangze. Big +1 for the > > overall > > > > > > > > proposal. It can help a lot in troubleshooting. > > > > > > > > > > > > > > > > Here are a few questions for it: > > > > > > > > 1. Shall we make JobVertexID a composition of JobID and a > > topology > > > > > > index? > > > > > > > > This would help in the session cluster case, so that we can > > identify > > > > > > > which > > > > > > > > tasks are from which jobs along with the rework of > > ExecutionAttemptID. > > > > > > > > > > > > > > > > 2. You mentioned that "Add the producer info to the string > > literal of > > > > > > > > IntermediateDataSetID". Do you mean to make > > IntermediateDataSetID a > > > > > > > > composition of JobVertexID and a consumer index? > > > > > > > > How about we still keep IntermediateDataSetID independent > from > > > > > > > JobVertexID, > > > > > > > > but just print the producing relationships in logs? I think > > keeping > > > > > > > > IntermediateDataSetID independent may be better considering > > the cross > > > > > > job > > > > > > > > result usages in interactive query cases. > > > > > > > > > > > > > > > > 3. The new IDs will become larger with this rework. The > > > > > > > > TaskDeploymentDescriptor can become much larger since it is > > mainly > > > > > > > composed > > > > > > > > of a variety DIs. I'm not sure how much it would be but there > > can be > > > > > > more > > > > > > > > memory and CPU cost for it, and results in more frequent GCs, > > message > > > > > > > size > > > > > > > > exceeding akka frame limits, and a longer blocked time of > main > > thread. > > > > > > > > This should not be a problem in most cases but might be a > > problem for > > > > > > > large > > > > > > > > scale jobs. Shall we have an benchmark for it? > > > > > > > > > > > > > > > > Thanks, > > > > > > > > Zhu Zhu > > > > > > > > > > > > > > > > Yangze Guo <karma...@gmail.com> 于2020年3月31日周二 下午2:19写道: > > > > > > > > > > > > > > > > > Thank you all for the feedback! Sorry for the belated > reply. > > > > > > > > > > > > > > > > > > @Till > > > > > > > > > I'm +1 for your two ideas and I'd like to move these two > out > > of the > > > > > > > > > scope of this FLIP since the pipelined region scheduling is > > an > > > > > > ongoing > > > > > > > > > work now. > > > > > > > > > I also agree that we should not make the InstanceID in > > > > > > > > > TaskExecutorConnection being composed of the ResourceID > plus > > a > > > > > > > > > monotonically increasing value. Thanks a lot for your > > explanation. > > > > > > > > > > > > > > > > > > @Konstantin @Yang > > > > > > > > > Regarding the PodName of TaskExecutor on K8s, I second > Yang's > > > > > > > > > suggestion. It makes sense to me to let user export > > RESOURCE_ID and > > > > > > > > > make TM respect it. User needs to guarantee there is no > > collision for > > > > > > > > > different TM. > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > Yangze Guo > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Mar 31, 2020 at 12:25 AM Steven Wu < > > stevenz...@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > +1 on allowing user defined resourceId for taskmanager > > > > > > > > > > > > > > > > > > > > On Sun, Mar 29, 2020 at 7:24 PM Yang Wang < > > danrtsey...@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Hi Konstantin, > > > > > > > > > > > > > > > > > > > > > > I think it is a good idea. Currently, our users also > > report a > > > > > > > similar > > > > > > > > > issue > > > > > > > > > > > with > > > > > > > > > > > resourceId of standalone cluster. When we start a > > standalone > > > > > > > cluster > > > > > > > > > now, > > > > > > > > > > > the `TaskManagerRunner` always generates a uuid for the > > > > > > > resourceId. It > > > > > > > > > will > > > > > > > > > > > be used to register to the jobmanager and not > convenient > > to match > > > > > > > with > > > > > > > > > the > > > > > > > > > > > real > > > > > > > > > > > taskmanager, especially in container environment. > > > > > > > > > > > > > > > > > > > > > > I think a probably solution is we could support the > user > > defined > > > > > > > > > > > resourceId. > > > > > > > > > > > We could get it from the environment. For standalone on > > K8s, we > > > > > > > could > > > > > > > > > set > > > > > > > > > > > the "RESOURCE_ID" env to the pod name so that it is > > easier to > > > > > > > match the > > > > > > > > > > > taskmanager with K8s pod. > > > > > > > > > > > > > > > > > > > > > > Moreover, i am afraid we could not set the pod name to > > the > > > > > > > resourceId. > > > > > > > > > I > > > > > > > > > > > think > > > > > > > > > > > you could set the "deployment.meta.name". Since the > pod > > name is > > > > > > > > > generated > > > > > > > > > > > by > > > > > > > > > > > K8s in the pattern > > {deployment.meta.nane}-{rc.uuid}-{uuid}. On > > > > > > the > > > > > > > > > > > contrary, we > > > > > > > > > > > will set the resourceId to the pod name. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > Yang > > > > > > > > > > > > > > > > > > > > > > Konstantin Knauf <konstan...@ververica.com> > > 于2020年3月29日周日 > > > > > > > 下午8:06写道: > > > > > > > > > > > > > > > > > > > > > > > Hi Yangze, Hi Till, > > > > > > > > > > > > > > > > > > > > > > > > thanks you for working on this topic. I believe it > > will make > > > > > > > > > debugging > > > > > > > > > > > > large Apache Flink deployments much more feasible. > > > > > > > > > > > > > > > > > > > > > > > > I was wondering whether it would make sense to allow > > the user > > > > > > to > > > > > > > > > specify > > > > > > > > > > > > the Resource ID in standalone setups? For example, > > many users > > > > > > > still > > > > > > > > > > > > implicitly use standalone clusters on Kubernetes (the > > native > > > > > > > support > > > > > > > > > is > > > > > > > > > > > > still experimental) and in these cases it would be > > interesting > > > > > > to > > > > > > > > > also > > > > > > > > > > > set > > > > > > > > > > > > the PodName as the ResourceID. What do you think? > > > > > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > > > > > > > > > > > > > > Kosntantin > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Mar 26, 2020 at 6:49 PM Till Rohrmann < > > > > > > > trohrm...@apache.org> > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > Hi Yangze, > > > > > > > > > > > > > > > > > > > > > > > > > > thanks for creating this FLIP. I think it is a very > > good > > > > > > > > > improvement > > > > > > > > > > > > > helping our users and ourselves understanding > better > > what's > > > > > > > going > > > > > > > > > on in > > > > > > > > > > > > > Flink. > > > > > > > > > > > > > > > > > > > > > > > > > > Creating the ResourceIDs with host information/pod > > name is a > > > > > > > good > > > > > > > > > idea. > > > > > > > > > > > > > > > > > > > > > > > > > > Also deriving ExecutionGraph IDs from their > superset > > ID is a > > > > > > > good > > > > > > > > > idea. > > > > > > > > > > > > > > > > > > > > > > > > > > The InstanceID is used for fencing purposes. I > would > > not make > > > > > > > it a > > > > > > > > > > > > > composition of the ResourceID + a monotonically > > increasing > > > > > > > number. > > > > > > > > > The > > > > > > > > > > > > > problem is that in case of a RM failure the > > InstanceIDs would > > > > > > > start > > > > > > > > > > > from > > > > > > > > > > > > 0 > > > > > > > > > > > > > again and this could lead to collisions. > > > > > > > > > > > > > > > > > > > > > > > > > > Logging more information on how the different > > runtime IDs are > > > > > > > > > > > correlated > > > > > > > > > > > > is > > > > > > > > > > > > > also a good idea. > > > > > > > > > > > > > > > > > > > > > > > > > > Two other ideas for simplifying the ids are the > > following: > > > > > > > > > > > > > > > > > > > > > > > > > > * The SlotRequestID was introduced because the > > SlotPool was a > > > > > > > > > separate > > > > > > > > > > > > > RpcEndpoint a while ago. With this no longer being > > the case I > > > > > > > > > think we > > > > > > > > > > > > > could remove the SlotRequestID and replace it with > > the > > > > > > > > > AllocationID. > > > > > > > > > > > > > * Instead of creating new SlotRequestIDs for multi > > task slots > > > > > > > one > > > > > > > > > could > > > > > > > > > > > > > derive them from the SlotRequestID used for > > requesting the > > > > > > > > > underlying > > > > > > > > > > > > > AllocatedSlot. > > > > > > > > > > > > > > > > > > > > > > > > > > Given that the slot sharing logic will most likely > be > > > > > > reworked > > > > > > > > > with the > > > > > > > > > > > > > pipelined region scheduling, we might be able to > > resolve > > > > > > these > > > > > > > two > > > > > > > > > > > points > > > > > > > > > > > > > as part of the pipelined region scheduling effort. > > > > > > > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > > > Till > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Mar 26, 2020 at 10:51 AM Yangze Guo < > > > > > > > karma...@gmail.com> > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi everyone, > > > > > > > > > > > > > > > > > > > > > > > > > > > > We would like to start a discussion thread on > > "FLIP-118: > > > > > > > Improve > > > > > > > > > > > > > > Flink’s ID system"[1]. > > > > > > > > > > > > > > > > > > > > > > > > > > > > This FLIP mainly discusses the following issues, > > target to > > > > > > > > > enhance > > > > > > > > > > > the > > > > > > > > > > > > > > readability of IDs in log and help user to debug > > in case of > > > > > > > > > failures: > > > > > > > > > > > > > > > > > > > > > > > > > > > > - Enhance the readability of the string literals > > of IDs. > > > > > > > Most of > > > > > > > > > them > > > > > > > > > > > > > > are hashcodes, e.g. ExecutionAttemptID, which do > > not > > > > > > provide > > > > > > > much > > > > > > > > > > > > > > meaningful information and are hard to recognize > > and > > > > > > compare > > > > > > > for > > > > > > > > > > > > > > users. > > > > > > > > > > > > > > - Log the ID’s lineage information to make > > debugging more > > > > > > > > > convenient. > > > > > > > > > > > > > > Currently, the log fails to always show the > lineage > > > > > > > information > > > > > > > > > > > > > > between IDs. Finding out relationships between > > entities > > > > > > > > > identified by > > > > > > > > > > > > > > given IDs is a common demand, e.g., slot of which > > > > > > > AllocationID is > > > > > > > > > > > > > > assigned to satisfy slot request of with > > SlotRequestID. > > > > > > > Absence > > > > > > > > > of > > > > > > > > > > > > > > such lineage information, it’s impossible to > track > > the end > > > > > > > to end > > > > > > > > > > > > > > lifecycle of an Execution or a Task now, which > > makes > > > > > > > debugging > > > > > > > > > > > > > > difficult. > > > > > > > > > > > > > > > > > > > > > > > > > > > > Key changes proposed in the FLIP are as follows: > > > > > > > > > > > > > > > > > > > > > > > > > > > > - Add location information to distributed > > components > > > > > > > > > > > > > > - Add topology information to graph components > > > > > > > > > > > > > > - Log the ID’s lineage information > > > > > > > > > > > > > > - Expose the identifier of distributing component > > to user > > > > > > > > > > > > > > > > > > > > > > > > > > > > Please find more details in the FLIP wiki > document > > [1]. > > > > > > > Looking > > > > > > > > > > > forward > > > > > > > > > > > > > to > > > > > > > > > > > > > > your feedbacks. > > > > > > > > > > > > > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148643521 > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > > Yangze Guo > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > > > > > > > > > > > Konstantin Knauf | Head of Product > > > > > > > > > > > > > > > > > > > > > > > > +49 160 91394525 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Follow us @VervericaData Ververica < > > https://www.ververica.com/ > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > > > > > > > > > > > Join Flink Forward <https://flink-forward.org/> - > The > > Apache > > > > > > > Flink > > > > > > > > > > > > Conference > > > > > > > > > > > > > > > > > > > > > > > > Stream Processing | Event Driven | Real Time > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > > > > > > > > > > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, > > Germany > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > Ververica GmbH > > > > > > > > > > > > Registered at Amtsgericht Charlottenburg: HRB 158244 > B > > > > > > > > > > > > Managing Directors: Timothy Alexander Steinert, Yip > > Park Tung > > > > > > > Jason, > > > > > > > > > Ji > > > > > > > > > > > > (Tony) Cheng > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >