[ 
https://issues.apache.org/jira/browse/FLINK-25103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wangbaohua updated FLINK-25103:
-------------------------------
    Summary: KeyedBroadcastProcessFunction run set 6, parallelism ValueState 
variables A  (was: KeyedBroadcastProcessFunction run set 6, parallelism 
ValueState variables A, could you tell me how to store in the six tasks A  )

> KeyedBroadcastProcessFunction run set 6, parallelism ValueState variables A
> ---------------------------------------------------------------------------
>
>                 Key: FLINK-25103
>                 URL: https://issues.apache.org/jira/browse/FLINK-25103
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Task
>    Affects Versions: 1.14.0
>            Reporter: wangbaohua
>            Priority: Blocker
>
> KeyedBroadcastProcessFunction run set 6, parallelism ValueState variables A, 
> excuse me how A stored in the six tasks.  When I was running, I observed that 
> some tasks fetched variable A was null, while others had values  .The 
> following code  :
> ....
> setParallelism(9);
> ......
> public class dealStreamProcessFunction extends 
> KeyedBroadcastProcessFunction<String, StandardEvent, List<String>, 
> StandardEvent> {
>     private static final Logger logger = 
> LoggerFactory.getLogger(dealStreamProcessFunction.class);
>     private transient ValueState<List<StandardEvent>> listState;
>     private transient ValueState<Boolean> runingFlagState;
>     private transient ValueState<InferenceEngine> engineState;
>     MapStateDescriptor<String, List<String>> ruleStateDescriptor = new 
> MapStateDescriptor<>(ContextInfo.RULE_SBROAD_CAST_STATE
>             , BasicTypeInfo.STRING_TYPE_INFO
>             , new ListTypeInfo<>(String.class));
>     InferenceEngine engine;
>     /**
>      * open方法只会执行一次
>      * 可以在这实现初始化的功能
>      *
>      * @param parameters
>      * @throws Exception
>      */
>     @Override
>     public void open(Configuration parameters) throws Exception {
>         super.open(parameters);
>         ValueStateDescriptor<List<StandardEvent>> recentOperatorsDescriptor = 
> new ValueStateDescriptor<List<StandardEvent>>(
>                 "recent-operator",
>                 TypeInformation.of(new TypeHint<List<StandardEvent>>() {
>                 }));
>         ValueStateDescriptor<Boolean> runingFlagDescriptor = new 
> ValueStateDescriptor<Boolean>(
>                 "runingFlag",
>                 Boolean.class);
>         ValueStateDescriptor<InferenceEngine> engineDescriptor = new 
> ValueStateDescriptor<InferenceEngine>(
>                 "runingFlag1",
>                 InferenceEngine.class);
>         engineState = getRuntimeContext().getState(engineDescriptor);
>         listState = getRuntimeContext().getState(recentOperatorsDescriptor);
>         runingFlagState = getRuntimeContext().getState(runingFlagDescriptor);
>         logger.info("KeyedBroadcastProcessFunction open");
>     }
>     @Override
>     public void processElement(StandardEvent standardEvent, ReadOnlyContext 
> readOnlyContext, Collector<StandardEvent> collector) throws Exception {
>         if(standardEvent == null){
>             return;
>         }
>         List<String> list = null;
>         list = 
> readOnlyContext.getBroadcastState(ruleStateDescriptor).get(ContextInfo.RULE_SBROAD_CAST_STATE);
>         if (list == null) {
>             logger.info("RulesBroadcastState is null..............");
>             List<StandardEvent> lst = listState.value();
>             if (lst == null) {
>                 lst = new ArrayList<>();
>             }
>             lst.add(standardEvent);
>             listState.update(lst);
>             return;
>         }
>         //第一次进来
>         if (runingFlagState.value() == null) {
>             logger.info("runingFlagState.value() == null");
>             runingFlagState.update(true);
>         }
>         if (((runingFlagState.value() && list.get(0).equals("1")) || 
> list.get(0).equals("0"))) {
>             logger.info("action update.....:" + list.size() + ":" + 
> runingFlagState.value() + ":" + list.get(0));
>             String flag = list.get(0);
>             list.remove(0);
>             InferenceEngine engine1 = 
> InferenceEngine.compile(RuleReader.parseRules(list));
>             engineState.update(engine1);
>             if (runingFlagState.value() && flag.equals("1")) {
>                 runingFlagState.update(false);
>             }
>         }
>         if (engineState.value() != null) {
>             List<StandardEvent> listTmp = listState.value();
>             if (listTmp != null) {
>                 for (StandardEvent standardEventTmp : listTmp) {
>                     logger.info("listState.....:" + standardEventTmp);
>                     match(standardEventTmp, collector);
>                 }
>                 listState.clear();
>             }
>             match(standardEvent, collector);
>         } else {
>             logger.info("processElement engine is null.....:");
>         }
>     }
>     private void match(StandardEvent standardEvent, Collector<StandardEvent> 
> collector) throws IOException {
>         PatternMatcher matcher = engineState.value().matcher(standardEvent);
>         if (matcher.find()) {
>             List<Action> actions = matcher.getActions();
>             for (Action action : actions) {
>                 if (standardEvent != null) {
>                     if(collector != null)
>                         collector.collect(standardEvent);
>                     else
>                         logger.info("collector is null:" );
>                 }
>             }
>         } else {
>             logger.info("no matcher:" + standardEvent);
>         }
>     }
>     @Override
>     public void processBroadcastElement(List<String> strings, Context 
> context, Collector<StandardEvent> collector) throws Exception {
>         BroadcastState<String, List<String>> broadcastState = 
> context.getBroadcastState(ruleStateDescriptor);
>         logger.info("processBroadcastElement.....:" + strings.size());
>         if (broadcastState.contains(ContextInfo.RULE_SBROAD_CAST_STATE)) {
>             List<String> oldList = 
> broadcastState.get(ContextInfo.RULE_SBROAD_CAST_STATE);
>             logger.info("get State:" + oldList.size() + "  replaced with 
> State:" + strings.size());
>         } else {
>             logger.info("do not find old State, put first counterState {}", 
> strings.size());
>         }
>         broadcastState.put(ContextInfo.RULE_SBROAD_CAST_STATE, strings);
>     }
> }



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to