wangbaohua created FLINK-25103:
----------------------------------
Summary: KeyedBroadcastProcessFunction run set 6, parallelism
ValueState variables A, could you tell me how to store in the six tasks A
Key: FLINK-25103
URL: https://issues.apache.org/jira/browse/FLINK-25103
Project: Flink
Issue Type: Bug
Components: Runtime / Task
Affects Versions: 1.14.0
Reporter: wangbaohua
KeyedBroadcastProcessFunction run set 6, parallelism ValueState variables A,
excuse me how A stored in the six tasks. When I was running, I observed that
some tasks fetched variable A was null, while others had values .The following
code :
....
setParallelism(9);
......
public class dealStreamProcessFunction extends
KeyedBroadcastProcessFunction<String, StandardEvent, List<String>,
StandardEvent> {
private static final Logger logger =
LoggerFactory.getLogger(dealStreamProcessFunction.class);
private transient ValueState<List<StandardEvent>> listState;
private transient ValueState<Boolean> runingFlagState;
private transient ValueState<InferenceEngine> engineState;
MapStateDescriptor<String, List<String>> ruleStateDescriptor = new
MapStateDescriptor<>(ContextInfo.RULE_SBROAD_CAST_STATE
, BasicTypeInfo.STRING_TYPE_INFO
, new ListTypeInfo<>(String.class));
InferenceEngine engine;
/**
* open方法只会执行一次
* 可以在这实现初始化的功能
*
* @param parameters
* @throws Exception
*/
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
ValueStateDescriptor<List<StandardEvent>> recentOperatorsDescriptor =
new ValueStateDescriptor<List<StandardEvent>>(
"recent-operator",
TypeInformation.of(new TypeHint<List<StandardEvent>>() {
}));
ValueStateDescriptor<Boolean> runingFlagDescriptor = new
ValueStateDescriptor<Boolean>(
"runingFlag",
Boolean.class);
ValueStateDescriptor<InferenceEngine> engineDescriptor = new
ValueStateDescriptor<InferenceEngine>(
"runingFlag1",
InferenceEngine.class);
engineState = getRuntimeContext().getState(engineDescriptor);
listState = getRuntimeContext().getState(recentOperatorsDescriptor);
runingFlagState = getRuntimeContext().getState(runingFlagDescriptor);
logger.info("KeyedBroadcastProcessFunction open");
}
@Override
public void processElement(StandardEvent standardEvent, ReadOnlyContext
readOnlyContext, Collector<StandardEvent> collector) throws Exception {
if(standardEvent == null){
return;
}
List<String> list = null;
list =
readOnlyContext.getBroadcastState(ruleStateDescriptor).get(ContextInfo.RULE_SBROAD_CAST_STATE);
if (list == null) {
logger.info("RulesBroadcastState is null..............");
List<StandardEvent> lst = listState.value();
if (lst == null) {
lst = new ArrayList<>();
}
lst.add(standardEvent);
listState.update(lst);
return;
}
//第一次进来
if (runingFlagState.value() == null) {
logger.info("runingFlagState.value() == null");
runingFlagState.update(true);
}
if (((runingFlagState.value() && list.get(0).equals("1")) ||
list.get(0).equals("0"))) {
logger.info("action update.....:" + list.size() + ":" +
runingFlagState.value() + ":" + list.get(0));
String flag = list.get(0);
list.remove(0);
InferenceEngine engine1 =
InferenceEngine.compile(RuleReader.parseRules(list));
engineState.update(engine1);
if (runingFlagState.value() && flag.equals("1")) {
runingFlagState.update(false);
}
}
if (engineState.value() != null) {
List<StandardEvent> listTmp = listState.value();
if (listTmp != null) {
for (StandardEvent standardEventTmp : listTmp) {
logger.info("listState.....:" + standardEventTmp);
match(standardEventTmp, collector);
}
listState.clear();
}
match(standardEvent, collector);
} else {
logger.info("processElement engine is null.....:");
}
}
private void match(StandardEvent standardEvent, Collector<StandardEvent>
collector) throws IOException {
PatternMatcher matcher = engineState.value().matcher(standardEvent);
if (matcher.find()) {
List<Action> actions = matcher.getActions();
for (Action action : actions) {
if (standardEvent != null) {
if(collector != null)
collector.collect(standardEvent);
else
logger.info("collector is null:" );
}
}
} else {
logger.info("no matcher:" + standardEvent);
}
}
@Override
public void processBroadcastElement(List<String> strings, Context context,
Collector<StandardEvent> collector) throws Exception {
BroadcastState<String, List<String>> broadcastState =
context.getBroadcastState(ruleStateDescriptor);
logger.info("processBroadcastElement.....:" + strings.size());
if (broadcastState.contains(ContextInfo.RULE_SBROAD_CAST_STATE)) {
List<String> oldList =
broadcastState.get(ContextInfo.RULE_SBROAD_CAST_STATE);
logger.info("get State:" + oldList.size() + " replaced with
State:" + strings.size());
} else {
logger.info("do not find old State, put first counterState {}",
strings.size());
}
broadcastState.put(ContextInfo.RULE_SBROAD_CAST_STATE, strings);
}
}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)