1996fanrui commented on code in PR #23550:
URL: https://github.com/apache/flink/pull/23550#discussion_r1368530995
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java:
##########
@@ -641,17 +641,15 @@ private Map<StreamConfig.SourceInputConfig,
ChainedSource> createChainedSources(
@Nullable
private Counter getOperatorRecordsOutCounter(
StreamTask<?, ?> containingTask, StreamConfig operatorConfig) {
- ClassLoader userCodeClassloader =
containingTask.getUserCodeClassLoader();
- StreamOperatorFactory<?> operatorFactory =
- operatorConfig.getStreamOperatorFactory(userCodeClassloader);
+ String streamOperatorFactoryClassName =
operatorConfig.getStreamOperatorFactoryClassName();
// Do not use the numRecordsOut counter on output if this operator is
SinkWriterOperator.
//
// Metric "numRecordsOut" is defined as the total number of records
written to the
// external system in FLIP-33, but this metric is occupied in
AbstractStreamOperator as the
// number of records sent to downstream operators, which is number of
Committable batches
// sent to SinkCommitter. So we skip registering this metric on output
and leave this metric
// to sink writer implementations to report.
- if (operatorFactory instanceof SinkWriterOperatorFactory) {
+ if
(SinkWriterOperatorFactory.class.getName().equals(streamOperatorFactoryClassName))
{
Review Comment:
Thanks @pnowojski for the review!
I see `SinkWriterOperatorFactory` is a final class for now. Are you worried
about it being changed in the future?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]