[
https://issues.apache.org/jira/browse/FLINK-25103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17453848#comment-17453848
]
Piotr Nowojski commented on FLINK-25103:
----------------------------------------
Hey [~wangbaohua]. Note for the future, it's much better to ask such questions
on [the user mailing
list|https://flink.apache.org/community.html#mailing-lists]. You would get help
and response there much quicker than via filing a ticket.
Getting back to your question, what do you mean by "ValueState variables A"
with the context of the shared code snippet? It would be also helpful (both for
us and for you) if you could create a minimalistic example, with some
artificial data source, that we can run and that would show the problem.
> KeyedBroadcastProcessFunction run set 6, parallelism ValueState variables A
> ---------------------------------------------------------------------------
>
> Key: FLINK-25103
> URL: https://issues.apache.org/jira/browse/FLINK-25103
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Task
> Affects Versions: 1.14.0
> Reporter: wangbaohua
> Priority: Major
>
> KeyedBroadcastProcessFunction run set 6, parallelism ValueState variables A,
> excuse me how A stored in the six tasks. When I was running, I observed that
> some tasks fetched variable A was null, while others had values .The
> following code :
> ....
> setParallelism(9);
> ......
> public class dealStreamProcessFunction extends
> KeyedBroadcastProcessFunction<String, StandardEvent, List<String>,
> StandardEvent> {
> private static final Logger logger =
> LoggerFactory.getLogger(dealStreamProcessFunction.class);
> private transient ValueState<List<StandardEvent>> listState;
> private transient ValueState<Boolean> runingFlagState;
> private transient ValueState<InferenceEngine> engineState;
> MapStateDescriptor<String, List<String>> ruleStateDescriptor = new
> MapStateDescriptor<>(ContextInfo.RULE_SBROAD_CAST_STATE
> , BasicTypeInfo.STRING_TYPE_INFO
> , new ListTypeInfo<>(String.class));
> InferenceEngine engine;
> /**
> * open方法只会执行一次
> * 可以在这实现初始化的功能
> *
> * @param parameters
> * @throws Exception
> */
> @Override
> public void open(Configuration parameters) throws Exception {
> super.open(parameters);
> ValueStateDescriptor<List<StandardEvent>> recentOperatorsDescriptor =
> new ValueStateDescriptor<List<StandardEvent>>(
> "recent-operator",
> TypeInformation.of(new TypeHint<List<StandardEvent>>() {
> }));
> ValueStateDescriptor<Boolean> runingFlagDescriptor = new
> ValueStateDescriptor<Boolean>(
> "runingFlag",
> Boolean.class);
> ValueStateDescriptor<InferenceEngine> engineDescriptor = new
> ValueStateDescriptor<InferenceEngine>(
> "runingFlag1",
> InferenceEngine.class);
> engineState = getRuntimeContext().getState(engineDescriptor);
> listState = getRuntimeContext().getState(recentOperatorsDescriptor);
> runingFlagState = getRuntimeContext().getState(runingFlagDescriptor);
> logger.info("KeyedBroadcastProcessFunction open");
> }
> @Override
> public void processElement(StandardEvent standardEvent, ReadOnlyContext
> readOnlyContext, Collector<StandardEvent> collector) throws Exception {
> if(standardEvent == null){
> return;
> }
> List<String> list = null;
> list =
> readOnlyContext.getBroadcastState(ruleStateDescriptor).get(ContextInfo.RULE_SBROAD_CAST_STATE);
> if (list == null) {
> logger.info("RulesBroadcastState is null..............");
> List<StandardEvent> lst = listState.value();
> if (lst == null) {
> lst = new ArrayList<>();
> }
> lst.add(standardEvent);
> listState.update(lst);
> return;
> }
> //第一次进来
> if (runingFlagState.value() == null) {
> logger.info("runingFlagState.value() == null");
> runingFlagState.update(true);
> }
> if (((runingFlagState.value() && list.get(0).equals("1")) ||
> list.get(0).equals("0"))) {
> logger.info("action update.....:" + list.size() + ":" +
> runingFlagState.value() + ":" + list.get(0));
> String flag = list.get(0);
> list.remove(0);
> InferenceEngine engine1 =
> InferenceEngine.compile(RuleReader.parseRules(list));
> engineState.update(engine1);
> if (runingFlagState.value() && flag.equals("1")) {
> runingFlagState.update(false);
> }
> }
> if (engineState.value() != null) {
> List<StandardEvent> listTmp = listState.value();
> if (listTmp != null) {
> for (StandardEvent standardEventTmp : listTmp) {
> logger.info("listState.....:" + standardEventTmp);
> match(standardEventTmp, collector);
> }
> listState.clear();
> }
> match(standardEvent, collector);
> } else {
> logger.info("processElement engine is null.....:");
> }
> }
> private void match(StandardEvent standardEvent, Collector<StandardEvent>
> collector) throws IOException {
> PatternMatcher matcher = engineState.value().matcher(standardEvent);
> if (matcher.find()) {
> List<Action> actions = matcher.getActions();
> for (Action action : actions) {
> if (standardEvent != null) {
> if(collector != null)
> collector.collect(standardEvent);
> else
> logger.info("collector is null:" );
> }
> }
> } else {
> logger.info("no matcher:" + standardEvent);
> }
> }
> @Override
> public void processBroadcastElement(List<String> strings, Context
> context, Collector<StandardEvent> collector) throws Exception {
> BroadcastState<String, List<String>> broadcastState =
> context.getBroadcastState(ruleStateDescriptor);
> logger.info("processBroadcastElement.....:" + strings.size());
> if (broadcastState.contains(ContextInfo.RULE_SBROAD_CAST_STATE)) {
> List<String> oldList =
> broadcastState.get(ContextInfo.RULE_SBROAD_CAST_STATE);
> logger.info("get State:" + oldList.size() + " replaced with
> State:" + strings.size());
> } else {
> logger.info("do not find old State, put first counterState {}",
> strings.size());
> }
> broadcastState.put(ContextInfo.RULE_SBROAD_CAST_STATE, strings);
> }
> }
--
This message was sent by Atlassian Jira
(v8.20.1#820001)