fapaul commented on pull request #18428:
URL: https://github.com/apache/flink/pull/18428#issuecomment-1027285397


   > Hi Fabian @fapaul ~ Very thanks for the update! As a whole I'm also agree 
with that we could move some issues to separate PRs to unblock the following 
tasks, especially those issues only related to the internal implementation.
   > 
   > Currently some main remaining issues from my side are as follows:
   > 
   > **1. About setParallelism:**
   > 
   > I think the current solution to the setParallelism issue is creative, but 
since it affects the result of `Transformation#getParallelism`, I'm a bit 
concern on if we have enumerate all the cases. Currently there are two possible 
issues from my side:
   > 
   > * The semantics of `DataStream#getParallelism` is changed: previously it 
would always not return -1. This might be fixed by return the env's default if 
the transformation's parallelism is -1.
   > * `TableOperatorWrapperGenerator#calcParallelismAndResource` might have 
issues if it deals with both transformations with default parallelism and 
configured parallelism. Sorry I do not see an easy solution to solve this issue.
   
   I am not really familiar with this part of the code base but so far I did 
not see any failing tests regarding the `TableOperatorWrapperGenerator` can you 
elaborate a bit more why the current solution may cause problems.
   
   
   > I tried to see if we could first workaround this issue, but sorry I still 
not be able to find solutions.
   > 
   > **2. About the repeat transformation**
   > 
   > For the repeat transformation problem, I'm a bit hesitate that why we not 
remove the newly added transformations instead of the `SinkTransformation`? For 
example, it is feasible for users to first call `getExecutionPlan()` and then 
continue to change the properties of the `SinkTransformation`...
   
   Good point I take a look at this tomorrow.
   
   > **3. About the commit protocol**
   > 
   > Currently I'm a bit concern if we should keep there are only one summary 
message for each (checkpoint, subtask) pair, or we allow multiple messages but 
they must have the save total count. The problem here is that if we allow 
update the total count, the following operators would not have a method to know 
if it has received all the committables for one checkpoint.
   > 
   > Is it possible for now we first throws exception if the `CommitOperator` 
received the repeat summary message, but keeps the current implementation that 
it is still possible to emit multiple summary message for a single checkpoint? 
This poses stricter requirements for `pre-committer toplogy` and 
`post-committer topology`, thus it would not cause compatibility problem even 
we change the behavior afterwards.
   > 
   > Also I have tried to think whether unaligned checkpoint + rescaling might 
cause problems, but currently I do not find issues.
   
   Afaict the motivation to update the commit summary is exactly for unaligned 
checkpointing. In case the checkpoint barrier overtakes the records we only 
emit an incomplete summary and later need to emit a new summary with the 
updated amount of seen records. 
   In general, I am also open to restricting it in the first version and 
figuring out the unaligned checkpoint use cases later. WDYT?
   
   > 
   > **4. About the PartitionTransformation (Optional)**
   > 
   > > Your suggestion is definitely possible but it does not work for 
exchanges in the sink topology provided by the user. In this case, I would need 
to move the current logic from PartitionTransformationTranslator to 
SinkTransformationTranslator. Perhaps  it is a more general issue and the fix 
should reside in PartitionTransformationTranslator rather than only in the sink 
translation.
   > > Is there a downside when adding it to the 
PartitionTransformationTranslator?
   > 
   > I think changing the partition type on-the-fly might cover the possible 
issues on the callers. I think `PartitionTransformationTranslator` could check 
and throw exception for impossible cases, but he should not modify it directly. 
The callers should ensure to avoid the impossible cases. This issue might be 
postponed since it only related to the implementation for now.
   
   I think at some point we have to modify the partition transformation. Users 
might use a partition transformation in their custom sink topologies and they 
want it to work for batch and streaming mode. So it would be great to allow 
blocking exchanges in batch and default in streaming mode. 
   Throwing an exception would mean to either support batch or stream. Overall, 
I agree we can revisit this topic after this PR.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to