[
https://issues.apache.org/jira/browse/FLINK-25103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Piotr Nowojski updated FLINK-25103:
-----------------------------------
Priority: Major (was: Blocker)
> KeyedBroadcastProcessFunction run set 6, parallelism ValueState variables A
> ---------------------------------------------------------------------------
>
> Key: FLINK-25103
> URL: https://issues.apache.org/jira/browse/FLINK-25103
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Task
> Affects Versions: 1.14.0
> Reporter: wangbaohua
> Priority: Major
>
> KeyedBroadcastProcessFunction run set 6, parallelism ValueState variables A,
> excuse me how A stored in the six tasks. When I was running, I observed that
> some tasks fetched variable A was null, while others had values .The
> following code :
> ....
> setParallelism(9);
> ......
> public class dealStreamProcessFunction extends
> KeyedBroadcastProcessFunction<String, StandardEvent, List<String>,
> StandardEvent> {
> private static final Logger logger =
> LoggerFactory.getLogger(dealStreamProcessFunction.class);
> private transient ValueState<List<StandardEvent>> listState;
> private transient ValueState<Boolean> runingFlagState;
> private transient ValueState<InferenceEngine> engineState;
> MapStateDescriptor<String, List<String>> ruleStateDescriptor = new
> MapStateDescriptor<>(ContextInfo.RULE_SBROAD_CAST_STATE
> , BasicTypeInfo.STRING_TYPE_INFO
> , new ListTypeInfo<>(String.class));
> InferenceEngine engine;
> /**
> * open方法只会执行一次
> * 可以在这实现初始化的功能
> *
> * @param parameters
> * @throws Exception
> */
> @Override
> public void open(Configuration parameters) throws Exception {
> super.open(parameters);
> ValueStateDescriptor<List<StandardEvent>> recentOperatorsDescriptor =
> new ValueStateDescriptor<List<StandardEvent>>(
> "recent-operator",
> TypeInformation.of(new TypeHint<List<StandardEvent>>() {
> }));
> ValueStateDescriptor<Boolean> runingFlagDescriptor = new
> ValueStateDescriptor<Boolean>(
> "runingFlag",
> Boolean.class);
> ValueStateDescriptor<InferenceEngine> engineDescriptor = new
> ValueStateDescriptor<InferenceEngine>(
> "runingFlag1",
> InferenceEngine.class);
> engineState = getRuntimeContext().getState(engineDescriptor);
> listState = getRuntimeContext().getState(recentOperatorsDescriptor);
> runingFlagState = getRuntimeContext().getState(runingFlagDescriptor);
> logger.info("KeyedBroadcastProcessFunction open");
> }
> @Override
> public void processElement(StandardEvent standardEvent, ReadOnlyContext
> readOnlyContext, Collector<StandardEvent> collector) throws Exception {
> if(standardEvent == null){
> return;
> }
> List<String> list = null;
> list =
> readOnlyContext.getBroadcastState(ruleStateDescriptor).get(ContextInfo.RULE_SBROAD_CAST_STATE);
> if (list == null) {
> logger.info("RulesBroadcastState is null..............");
> List<StandardEvent> lst = listState.value();
> if (lst == null) {
> lst = new ArrayList<>();
> }
> lst.add(standardEvent);
> listState.update(lst);
> return;
> }
> //第一次进来
> if (runingFlagState.value() == null) {
> logger.info("runingFlagState.value() == null");
> runingFlagState.update(true);
> }
> if (((runingFlagState.value() && list.get(0).equals("1")) ||
> list.get(0).equals("0"))) {
> logger.info("action update.....:" + list.size() + ":" +
> runingFlagState.value() + ":" + list.get(0));
> String flag = list.get(0);
> list.remove(0);
> InferenceEngine engine1 =
> InferenceEngine.compile(RuleReader.parseRules(list));
> engineState.update(engine1);
> if (runingFlagState.value() && flag.equals("1")) {
> runingFlagState.update(false);
> }
> }
> if (engineState.value() != null) {
> List<StandardEvent> listTmp = listState.value();
> if (listTmp != null) {
> for (StandardEvent standardEventTmp : listTmp) {
> logger.info("listState.....:" + standardEventTmp);
> match(standardEventTmp, collector);
> }
> listState.clear();
> }
> match(standardEvent, collector);
> } else {
> logger.info("processElement engine is null.....:");
> }
> }
> private void match(StandardEvent standardEvent, Collector<StandardEvent>
> collector) throws IOException {
> PatternMatcher matcher = engineState.value().matcher(standardEvent);
> if (matcher.find()) {
> List<Action> actions = matcher.getActions();
> for (Action action : actions) {
> if (standardEvent != null) {
> if(collector != null)
> collector.collect(standardEvent);
> else
> logger.info("collector is null:" );
> }
> }
> } else {
> logger.info("no matcher:" + standardEvent);
> }
> }
> @Override
> public void processBroadcastElement(List<String> strings, Context
> context, Collector<StandardEvent> collector) throws Exception {
> BroadcastState<String, List<String>> broadcastState =
> context.getBroadcastState(ruleStateDescriptor);
> logger.info("processBroadcastElement.....:" + strings.size());
> if (broadcastState.contains(ContextInfo.RULE_SBROAD_CAST_STATE)) {
> List<String> oldList =
> broadcastState.get(ContextInfo.RULE_SBROAD_CAST_STATE);
> logger.info("get State:" + oldList.size() + " replaced with
> State:" + strings.size());
> } else {
> logger.info("do not find old State, put first counterState {}",
> strings.size());
> }
> broadcastState.put(ContextInfo.RULE_SBROAD_CAST_STATE, strings);
> }
> }
--
This message was sent by Atlassian Jira
(v8.20.1#820001)