Aleksandr Savonin created FLINK-39776:
-----------------------------------------

             Summary: Expose JobInfo on Source contexts
                 Key: FLINK-39776
                 URL: https://issues.apache.org/jira/browse/FLINK-39776
             Project: Flink
          Issue Type: Improvement
          Components: API / Core, Runtime / Coordination
            Reporter: Aleksandr Savonin


{{SinkV2}} already exposes {{JobInfo}} to writers and committers via 
{{{}InitContext.getJobInfo(){}}}. The Source API has no equivalent on 
{{SourceReaderContext}} or {{{}SplitEnumeratorContext{}}}. However, source 
implementations that also need {{{}JobID{}}}, for example for per-job 
authentication to external services, per-job metric tagging, per-job offset 
stores in external systems or per-job log correlation must currently either:
 * Cast {{SourceReaderContext}} to the {{@Experimental}} 
{{RichSourceReaderContext}} and access it through 
{{{}getRuntimeContext().getJobInfo(){}}}. The drawback -> it has no stability 
contract, and has no equivalent on the enumerator side and 
{{SourceCoordinatorContext}} consumes {{JobID}} only via 
{{MdcUtils.scopeToJob}} for internal MDC scoping and never exposes it through 
{{{}SplitEnumeratorContext{}}}.

 * Parse {{metricGroup().getAllVariables().get("<job_id>")}} as a {{String}} 
and reconstruct {{JobID}} via {{{}JobID.fromHexString(...){}}}. Depends on the 
internal metric scope variable naming ({{{}ScopeFormat.SCOPE_JOB_ID{}}}), not 
an API contract.

The runtime already holds the data at both source execution points: 
{{SourceOperator}} has it via {{StreamingRuntimeContext}} (constructor-injected 
with {{{}env.getJobInfo(){}}}). {{SourceCoordinatorContext}} has it via the 
already {{@Internal}} {{{}OperatorCoordinator.Context.getJobID(){}}}. The gap 
is purely in the public API surface.

 

FLIP: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-583%3A+Expose+JobInfo+on+Source+contexts



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to