zhipeng93 commented on a change in pull request #18:
URL: https://github.com/apache/flink-ml/pull/18#discussion_r744116862
##########
File path:
flink-ml-lib/src/main/java/org/apache/flink/ml/common/broadcast/BroadcastStreamingRuntimeContext.java
##########
@@ -0,0 +1,68 @@
+package org.apache.flink.ml.common.broadcast;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * An subclass of {@link StreamingRuntimeContext} that provides accessibility
of broadcast
+ * variables.
+ */
+public class BroadcastStreamingRuntimeContext extends StreamingRuntimeContext {
+
+ Map<String, List<?>> broadcastVariables = new HashMap<>();
+
+ public BroadcastStreamingRuntimeContext(
+ Environment env,
+ Map<String, Accumulator<?, ?>> accumulators,
+ OperatorMetricGroup operatorMetricGroup,
+ OperatorID operatorID,
+ ProcessingTimeService processingTimeService,
+ @Nullable KeyedStateStore keyedStateStore,
+ ExternalResourceInfoProvider externalResourceInfoProvider) {
+ super(
+ env,
+ accumulators,
+ operatorMetricGroup,
+ operatorID,
+ processingTimeService,
+ keyedStateStore,
+ externalResourceInfoProvider);
+ }
+
+ @Override
+ public boolean hasBroadcastVariable(String name) {
+ return broadcastVariables.containsKey(name);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public <RT> List<RT> getBroadcastVariable(String name) {
+ return (List<RT>) broadcastVariables.get(name);
Review comment:
Hi Yun, good question.
I would like to throw a exception if users call `getBroadcastVariable`
before received all the elements of broadcastVariables, i.e., before calling
`processElement` or `processWatermark`. I believe it is enough for developing
ML algs for now.
By the way, we can optimize this later if we get a real use case for this.
How do you think?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]