zhipeng93 commented on code in PR #215:
URL: https://github.com/apache/flink-ml/pull/215#discussion_r1168137985


##########
flink-ml-core/src/main/java/org/apache/flink/ml/common/broadcast/operator/AbstractBroadcastWrapperOperator.java:
##########
@@ -269,138 +200,162 @@ private OperatorMetricGroup createOperatorMetricGroup(
     }
 
     /**
-     * extracts common processing logic in subclasses' processing elements.
+     * Extracts common processing logic in subclasses' processing elements.
      *
-     * @param streamRecord the input record.
-     * @param inputIndex input id, starts from zero.
-     * @param elementConsumer the consumer function of StreamRecord, i.e.,
+     * @param streamRecord The input record.
+     * @param inputIndex Input id, starts from zero.
+     * @param elementConsumer The consumer function of StreamRecord, i.e.,
      *     operator.processElement(...).
-     * @param watermarkConsumer the consumer function of WaterMark, i.e.,
+     * @param watermarkConsumer The consumer function of WaterMark, i.e.,
      *     operator.processWatermark(...).
-     * @throws Exception possible exception.
+     * @param keyContextSetter The consumer function of setting key context, 
i.e.,
+     *     operator.setKeyContext(...).
+     * @throws Exception Possible exception.
      */
     @SuppressWarnings({"rawtypes", "unchecked"})
     protected void processElementX(
             StreamRecord streamRecord,
             int inputIndex,
             ThrowingConsumer<StreamRecord, Exception> elementConsumer,
-            ThrowingConsumer<Watermark, Exception> watermarkConsumer)
+            ThrowingConsumer<Watermark, Exception> watermarkConsumer,
+            ThrowingConsumer<StreamRecord, Exception> keyContextSetter)
             throws Exception {
-        if (!isBlocked[inputIndex]) {
-            if (areBroadcastVariablesReady()) {
-                if (hasPendingElements[inputIndex]) {
-                    processPendingElementsAndWatermarks(
-                            inputIndex, elementConsumer, watermarkConsumer);
-                    hasPendingElements[inputIndex] = false;
+        if (richContext.hasRichFunction) {
+            if (!richContext.isBlocked[inputIndex]) {
+                if (areBroadcastVariablesReady()) {
+                    if (richContext.hasPendingElements[inputIndex]) {
+                        processPendingElementsAndWatermarks(
+                                inputIndex, elementConsumer, 
watermarkConsumer, keyContextSetter);
+                        richContext.hasPendingElements[inputIndex] = false;
+                        keyContextSetter.accept(streamRecord);

Review Comment:
   I am afraid not. `keyContextSetter.accept(streamRecord);` tries to set the 
key for the received streamRecord, not the cached ones.
   
   The logic for seting key context for cached elements lie in 
`processPendingElementsAndWatermarks`. [1]
   
   [1] 
https://github.com/apache/flink-ml/pull/215/files/a4e1f0576384762b22e9bbc264a020a6ec7bda39#diff-001f424b1d7ebde26cc633756f68c9c83d71b33fce0e9058d5b22ba2746e2959R357



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to