fapaul commented on pull request #18428: URL: https://github.com/apache/flink/pull/18428#issuecomment-1027285397
> Hi Fabian @fapaul ~ Very thanks for the update! As a whole I'm also agree with that we could move some issues to separate PRs to unblock the following tasks, especially those issues only related to the internal implementation. > > Currently some main remaining issues from my side are as follows: > > **1. About setParallelism:** > > I think the current solution to the setParallelism issue is creative, but since it affects the result of `Transformation#getParallelism`, I'm a bit concern on if we have enumerate all the cases. Currently there are two possible issues from my side: > > * The semantics of `DataStream#getParallelism` is changed: previously it would always not return -1. This might be fixed by return the env's default if the transformation's parallelism is -1. > * `TableOperatorWrapperGenerator#calcParallelismAndResource` might have issues if it deals with both transformations with default parallelism and configured parallelism. Sorry I do not see an easy solution to solve this issue. I am not really familiar with this part of the code base but so far I did not see any failing tests regarding the `TableOperatorWrapperGenerator` can you elaborate a bit more why the current solution may cause problems. > I tried to see if we could first workaround this issue, but sorry I still not be able to find solutions. > > **2. About the repeat transformation** > > For the repeat transformation problem, I'm a bit hesitate that why we not remove the newly added transformations instead of the `SinkTransformation`? For example, it is feasible for users to first call `getExecutionPlan()` and then continue to change the properties of the `SinkTransformation`... Good point I take a look at this tomorrow. > **3. About the commit protocol** > > Currently I'm a bit concern if we should keep there are only one summary message for each (checkpoint, subtask) pair, or we allow multiple messages but they must have the save total count. The problem here is that if we allow update the total count, the following operators would not have a method to know if it has received all the committables for one checkpoint. > > Is it possible for now we first throws exception if the `CommitOperator` received the repeat summary message, but keeps the current implementation that it is still possible to emit multiple summary message for a single checkpoint? This poses stricter requirements for `pre-committer toplogy` and `post-committer topology`, thus it would not cause compatibility problem even we change the behavior afterwards. > > Also I have tried to think whether unaligned checkpoint + rescaling might cause problems, but currently I do not find issues. Afaict the motivation to update the commit summary is exactly for unaligned checkpointing. In case the checkpoint barrier overtakes the records we only emit an incomplete summary and later need to emit a new summary with the updated amount of seen records. In general, I am also open to restricting it in the first version and figuring out the unaligned checkpoint use cases later. WDYT? > > **4. About the PartitionTransformation (Optional)** > > > Your suggestion is definitely possible but it does not work for exchanges in the sink topology provided by the user. In this case, I would need to move the current logic from PartitionTransformationTranslator to SinkTransformationTranslator. Perhaps it is a more general issue and the fix should reside in PartitionTransformationTranslator rather than only in the sink translation. > > Is there a downside when adding it to the PartitionTransformationTranslator? > > I think changing the partition type on-the-fly might cover the possible issues on the callers. I think `PartitionTransformationTranslator` could check and throw exception for impossible cases, but he should not modify it directly. The callers should ensure to avoid the impossible cases. This issue might be postponed since it only related to the implementation for now. I think at some point we have to modify the partition transformation. Users might use a partition transformation in their custom sink topologies and they want it to work for batch and streaming mode. So it would be great to allow blocking exchanges in batch and default in streaming mode. Throwing an exception would mean to either support batch or stream. Overall, I agree we can revisit this topic after this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
