[ 
https://issues.apache.org/jira/browse/FLINK-39776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan reassigned FLINK-39776:
-------------------------------

    Assignee: Aleksandr Savonin

> Expose JobInfo on Source contexts
> ---------------------------------
>
>                 Key: FLINK-39776
>                 URL: https://issues.apache.org/jira/browse/FLINK-39776
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / Core, Runtime / Coordination
>            Reporter: Aleksandr Savonin
>            Assignee: Aleksandr Savonin
>            Priority: Major
>
> {{SinkV2}} already exposes {{JobInfo}} to writers and committers via 
> {{{}InitContext.getJobInfo(){}}}. The Source API has no equivalent on 
> {{SourceReaderContext}} or {{{}SplitEnumeratorContext{}}}. However, source 
> implementations that also need {{{}JobID{}}}, for example for per-job 
> authentication to external services, per-job metric tagging, per-job offset 
> stores in external systems or per-job log correlation must currently either:
>  * Cast {{SourceReaderContext}} to the {{@Experimental}} 
> {{RichSourceReaderContext}} and access it through 
> {{{}getRuntimeContext().getJobInfo(){}}}. The drawback -> it has no stability 
> contract, and has no equivalent on the enumerator side and 
> {{SourceCoordinatorContext}} consumes {{JobID}} only via 
> {{MdcUtils.scopeToJob}} for internal MDC scoping and never exposes it through 
> {{{}SplitEnumeratorContext{}}}.
>  * Parse {{metricGroup().getAllVariables().get("<job_id>")}} as a {{String}} 
> and reconstruct {{JobID}} via {{{}JobID.fromHexString(...){}}}. Depends on 
> the internal metric scope variable naming ({{{}ScopeFormat.SCOPE_JOB_ID{}}}), 
> not an API contract.
> The runtime already holds the data at both source execution points: 
> {{SourceOperator}} has it via {{StreamingRuntimeContext}} 
> (constructor-injected with {{{}env.getJobInfo(){}}}). 
> {{SourceCoordinatorContext}} has it via the already {{@Internal}} 
> {{{}OperatorCoordinator.Context.getJobID(){}}}. The gap is purely in the 
> public API surface.
>  
> FLIP: 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-583%3A+Expose+JobInfo+on+Source+contexts



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to