[ 
https://issues.apache.org/jira/browse/FLINK-25103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17453848#comment-17453848
 ] 

Piotr Nowojski commented on FLINK-25103:
----------------------------------------

Hey [~wangbaohua]. Note for the future, it's much better to ask such questions 
on [the user mailing 
list|https://flink.apache.org/community.html#mailing-lists]. You would get help 
and response there much quicker than via filing a ticket.

Getting back to your question, what do you mean by "ValueState variables A" 
with the context of the shared code snippet? It would be also helpful (both for 
us and for you) if you could create a minimalistic example, with some 
artificial data source, that we can run and that would show the problem.

> KeyedBroadcastProcessFunction run set 6, parallelism ValueState variables A
> ---------------------------------------------------------------------------
>
>                 Key: FLINK-25103
>                 URL: https://issues.apache.org/jira/browse/FLINK-25103
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Task
>    Affects Versions: 1.14.0
>            Reporter: wangbaohua
>            Priority: Major
>
> KeyedBroadcastProcessFunction run set 6, parallelism ValueState variables A, 
> excuse me how A stored in the six tasks.  When I was running, I observed that 
> some tasks fetched variable A was null, while others had values  .The 
> following code  :
> ....
> setParallelism(9);
> ......
> public class dealStreamProcessFunction extends 
> KeyedBroadcastProcessFunction<String, StandardEvent, List<String>, 
> StandardEvent> {
>     private static final Logger logger = 
> LoggerFactory.getLogger(dealStreamProcessFunction.class);
>     private transient ValueState<List<StandardEvent>> listState;
>     private transient ValueState<Boolean> runingFlagState;
>     private transient ValueState<InferenceEngine> engineState;
>     MapStateDescriptor<String, List<String>> ruleStateDescriptor = new 
> MapStateDescriptor<>(ContextInfo.RULE_SBROAD_CAST_STATE
>             , BasicTypeInfo.STRING_TYPE_INFO
>             , new ListTypeInfo<>(String.class));
>     InferenceEngine engine;
>     /**
>      * open方法只会执行一次
>      * 可以在这实现初始化的功能
>      *
>      * @param parameters
>      * @throws Exception
>      */
>     @Override
>     public void open(Configuration parameters) throws Exception {
>         super.open(parameters);
>         ValueStateDescriptor<List<StandardEvent>> recentOperatorsDescriptor = 
> new ValueStateDescriptor<List<StandardEvent>>(
>                 "recent-operator",
>                 TypeInformation.of(new TypeHint<List<StandardEvent>>() {
>                 }));
>         ValueStateDescriptor<Boolean> runingFlagDescriptor = new 
> ValueStateDescriptor<Boolean>(
>                 "runingFlag",
>                 Boolean.class);
>         ValueStateDescriptor<InferenceEngine> engineDescriptor = new 
> ValueStateDescriptor<InferenceEngine>(
>                 "runingFlag1",
>                 InferenceEngine.class);
>         engineState = getRuntimeContext().getState(engineDescriptor);
>         listState = getRuntimeContext().getState(recentOperatorsDescriptor);
>         runingFlagState = getRuntimeContext().getState(runingFlagDescriptor);
>         logger.info("KeyedBroadcastProcessFunction open");
>     }
>     @Override
>     public void processElement(StandardEvent standardEvent, ReadOnlyContext 
> readOnlyContext, Collector<StandardEvent> collector) throws Exception {
>         if(standardEvent == null){
>             return;
>         }
>         List<String> list = null;
>         list = 
> readOnlyContext.getBroadcastState(ruleStateDescriptor).get(ContextInfo.RULE_SBROAD_CAST_STATE);
>         if (list == null) {
>             logger.info("RulesBroadcastState is null..............");
>             List<StandardEvent> lst = listState.value();
>             if (lst == null) {
>                 lst = new ArrayList<>();
>             }
>             lst.add(standardEvent);
>             listState.update(lst);
>             return;
>         }
>         //第一次进来
>         if (runingFlagState.value() == null) {
>             logger.info("runingFlagState.value() == null");
>             runingFlagState.update(true);
>         }
>         if (((runingFlagState.value() && list.get(0).equals("1")) || 
> list.get(0).equals("0"))) {
>             logger.info("action update.....:" + list.size() + ":" + 
> runingFlagState.value() + ":" + list.get(0));
>             String flag = list.get(0);
>             list.remove(0);
>             InferenceEngine engine1 = 
> InferenceEngine.compile(RuleReader.parseRules(list));
>             engineState.update(engine1);
>             if (runingFlagState.value() && flag.equals("1")) {
>                 runingFlagState.update(false);
>             }
>         }
>         if (engineState.value() != null) {
>             List<StandardEvent> listTmp = listState.value();
>             if (listTmp != null) {
>                 for (StandardEvent standardEventTmp : listTmp) {
>                     logger.info("listState.....:" + standardEventTmp);
>                     match(standardEventTmp, collector);
>                 }
>                 listState.clear();
>             }
>             match(standardEvent, collector);
>         } else {
>             logger.info("processElement engine is null.....:");
>         }
>     }
>     private void match(StandardEvent standardEvent, Collector<StandardEvent> 
> collector) throws IOException {
>         PatternMatcher matcher = engineState.value().matcher(standardEvent);
>         if (matcher.find()) {
>             List<Action> actions = matcher.getActions();
>             for (Action action : actions) {
>                 if (standardEvent != null) {
>                     if(collector != null)
>                         collector.collect(standardEvent);
>                     else
>                         logger.info("collector is null:" );
>                 }
>             }
>         } else {
>             logger.info("no matcher:" + standardEvent);
>         }
>     }
>     @Override
>     public void processBroadcastElement(List<String> strings, Context 
> context, Collector<StandardEvent> collector) throws Exception {
>         BroadcastState<String, List<String>> broadcastState = 
> context.getBroadcastState(ruleStateDescriptor);
>         logger.info("processBroadcastElement.....:" + strings.size());
>         if (broadcastState.contains(ContextInfo.RULE_SBROAD_CAST_STATE)) {
>             List<String> oldList = 
> broadcastState.get(ContextInfo.RULE_SBROAD_CAST_STATE);
>             logger.info("get State:" + oldList.size() + "  replaced with 
> State:" + strings.size());
>         } else {
>             logger.info("do not find old State, put first counterState {}", 
> strings.size());
>         }
>         broadcastState.put(ContextInfo.RULE_SBROAD_CAST_STATE, strings);
>     }
> }



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to