Sanil15 commented on code in PR #22576:
URL: https://github.com/apache/beam/pull/22576#discussion_r951968642
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpAdapter.java:
##########
@@ -51,20 +54,23 @@
private static final Logger LOG = LoggerFactory.getLogger(OpAdapter.class);
private final Op<InT, OutT, K> op;
+ private final String opName;
private transient List<OpMessage<OutT>> outputList;
private transient CompletionStage<Collection<OpMessage<OutT>>> outputFuture;
private transient Instant outputWatermark;
private transient OpEmitter<OutT> emitter;
private transient Config config;
private transient Context context;
+ private transient List<SamzaPipelineExceptionListener.Registrar>
exceptionListeners;
public static <InT, OutT, K> AsyncFlatMapFunction<OpMessage<InT>,
OpMessage<OutT>> adapt(
- Op<InT, OutT, K> op) {
- return new OpAdapter<>(op);
+ Op<InT, OutT, K> op, String opName) {
Review Comment:
I wired tranformName istead of context because OpAdapter is same for both
Portable mode and classic mode. However PortableTranslationContext &
TranslationContext do not follow any inheritance which requires handling them
differently in different Translators.
However if you feel this is important in the future i will do the
refactoring
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]