Aleksandr Savonin created FLINK-39776:
-----------------------------------------
Summary: Expose JobInfo on Source contexts
Key: FLINK-39776
URL: https://issues.apache.org/jira/browse/FLINK-39776
Project: Flink
Issue Type: Improvement
Components: API / Core, Runtime / Coordination
Reporter: Aleksandr Savonin
{{SinkV2}} already exposes {{JobInfo}} to writers and committers via
{{{}InitContext.getJobInfo(){}}}. The Source API has no equivalent on
{{SourceReaderContext}} or {{{}SplitEnumeratorContext{}}}. However, source
implementations that also need {{{}JobID{}}}, for example for per-job
authentication to external services, per-job metric tagging, per-job offset
stores in external systems or per-job log correlation must currently either:
* Cast {{SourceReaderContext}} to the {{@Experimental}}
{{RichSourceReaderContext}} and access it through
{{{}getRuntimeContext().getJobInfo(){}}}. The drawback -> it has no stability
contract, and has no equivalent on the enumerator side and
{{SourceCoordinatorContext}} consumes {{JobID}} only via
{{MdcUtils.scopeToJob}} for internal MDC scoping and never exposes it through
{{{}SplitEnumeratorContext{}}}.
* Parse {{metricGroup().getAllVariables().get("<job_id>")}} as a {{String}}
and reconstruct {{JobID}} via {{{}JobID.fromHexString(...){}}}. Depends on the
internal metric scope variable naming ({{{}ScopeFormat.SCOPE_JOB_ID{}}}), not
an API contract.
The runtime already holds the data at both source execution points:
{{SourceOperator}} has it via {{StreamingRuntimeContext}} (constructor-injected
with {{{}env.getJobInfo(){}}}). {{SourceCoordinatorContext}} has it via the
already {{@Internal}} {{{}OperatorCoordinator.Context.getJobID(){}}}. The gap
is purely in the public API surface.
FLIP:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-583%3A+Expose+JobInfo+on+Source+contexts
--
This message was sent by Atlassian Jira
(v8.20.10#820010)