[
https://issues.apache.org/jira/browse/FLINK-8248?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
jia liu updated FLINK-8248:
---------------------------
Description:
{panel:title=Here is my exception log}
java.lang.RuntimeException: Exception occurred while processing valve output
watermark:
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:291)
at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:189)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error while adding data to RocksDB
at
org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:103)
at
org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.updateNFA(AbstractKeyedCEPPatternOperator.java:309)
at
org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:247)
at
org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:277)
at
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:886)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:288)
... 7 more
Caused by: java.lang.IllegalStateException: Could not find id for entry:
SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', field='activity',
dimension='Logoff', description='null', icons=null,
startTimestamp=1465297200000, endTimestamp=1465297203600, count=11.0,
anomalyScore=100, adHashCode=-1866791453, timeMap={1465297200000=11.0},
user='LMR0049', logQuery=null, group='null'}, 1465300799999, 0),
[SharedBufferEdge(null, 199)], 1)
at
org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
at
org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:943)
at
org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:806)
at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:888)
at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:820)
at
org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:100)
... 13 more
{panel}
Main job code:
{code:java}
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
env.setStateBackend(new
RocksDBStateBackend(getString("flink.backend-state-dir")));
// .........
DataStream<Behavior> behaviorStream = anomalyStream
.assignTimestampsAndWatermarks(new
AnomalyTimestampExtractor(Time.seconds(0)))
.keyBy((KeySelector<AnomalySlice, String>) value ->
value.entity)
.window(SlidingEventTimeWindows.of(Time.seconds(getLong("flink.window.window-size")),
Time.seconds(getLong("flink.window.slice-size"))))
.apply(new BehaviorBuilderFunction())
.filter(new WhitelistFilterFunction())
// non-keyed stream will result in pattern operator parallelism
equal to 1.
.keyBy((KeySelector<Behavior, String>) Behavior::getUser);
// cep on behavior stream
List<Pattern> allPatterns = PatternsHolder.getAllPatterns();
for (Pattern pa : allPatterns) {
PatternStream<Behavior> ps = CEP.pattern(behaviorStream, pa);
ps.select(new AlertGenerator(pa.getName())).name(pa.getName());
}
{code}
keyed stream event:
{code:java}
public class Behavior implements Serializable {
private static final long serialVersionUID = 7786674623147772721L;
static int ANOMALY_SCORE_THRESHOLD = 40;
static int ANOMALY_COUNT_THRESHOLD = 3;
public final String schema;
public final String detector;
private String measure = UEBAConstants.DEFAULT_MEASURE_FIELD;
public final String dimension;
public final String field; //dim value
private String user;
public String group;
public double count;
public int anomalyScore;
protected String description;
private Icon[] icons;
private int adHashCode;
private long startTimestamp;
private long endTimestamp;
private Map<Long, Double> timeMap;
public ArrayList<HashMap<String, Object>> logQuery;
public Behavior(String schema, String detector, String field, String
dimension, String user,
long fromMillis, long toMillis, double count, int
anomalyScore, ArrayList<HashMap<String,
Object>> logQuery) {
this.schema = schema;
this.detector = detector;
this.field = field;
this.dimension = dimension;
this.user = user;
this.startTimestamp = fromMillis;
this.endTimestamp = toMillis;
this.count = count;
this.anomalyScore = anomalyScore;
this.logQuery = logQuery;
timeMap = new HashMap<>();
timeMap.put(fromMillis, count);
}
public Behavior(String schema, String detector, String field, String
dimension,
long fromMillis, long toMillis, double count, int
anomalyScore) {
this.schema = schema;
this.detector = detector;
this.field = field;
this.dimension = dimension;
this.startTimestamp = fromMillis;
this.endTimestamp = toMillis;
this.count = count;
this.anomalyScore = anomalyScore;
timeMap = new HashMap<>();
timeMap.put(fromMillis, count);
}
public String getGroup() {
return group;
}
public void setGroup(String group) {
this.group = group;
}
public void setAdHashCode(int hashCode) {
this.adHashCode = hashCode;
}
public void setMeasure(String measure) {
this.measure = measure;
}
public String getMeasure() {
return measure;
}
// anomalyScore is using weighted average, may not be wise.
public void add(long fromMillis, long toMillis, double count, int
anomalyScore, ArrayList<HashMap<String,
Object>> logQuery) {
double sum = this.count * this.anomalyScore + count * anomalyScore;
this.count += count;
this.anomalyScore = (int) (sum / this.count);
if (fromMillis < this.startTimestamp) {
this.startTimestamp = fromMillis;
}
if (toMillis > this.endTimestamp) {
this.endTimestamp = toMillis;
}
if (!timeMap.containsKey(fromMillis)) {
timeMap.put(fromMillis, 0.0);
}
timeMap.put(fromMillis, timeMap.get(fromMillis) + count);
if (logQuery != null) {
this.logQuery.addAll(logQuery);
}
}
public void add(long fromMillis, long toMillis, double count, int
anomalyScore) {
double sum = this.count * this.anomalyScore + count * anomalyScore;
this.count += count;
this.anomalyScore = (int) (sum / this.count);
if (fromMillis < this.startTimestamp) {
this.startTimestamp = fromMillis;
}
if (toMillis > this.endTimestamp) {
this.endTimestamp = toMillis;
}
if (!timeMap.containsKey(fromMillis)) {
timeMap.put(fromMillis, 0.0);
}
timeMap.put(fromMillis, timeMap.get(fromMillis) + count);
}
public Long[] getTimestamps() {
return timeMap.keySet().toArray(new Long[timeMap.size()]);
}
public String dimension() {
return dimension;
}
public long startTimestamp() {
return startTimestamp;
}
public long endTimestamp() {
return endTimestamp;
}
public double count() {
return count;
}
public int anomalyScore() {
return anomalyScore;
}
public boolean isAnomaly() {
return anomalyScore() >= ANOMALY_SCORE_THRESHOLD && count() >=
ANOMALY_COUNT_THRESHOLD;
}
public String getUser() {
return user;
}
public void setUser(String user) {
this.user = user;
}
public void describeAs(String description, Icon... icons) {
this.description = description;
this.icons = icons;
}
public String setVisualizeInterfaceParameter(String group, long
visualizeStartTimestamp, long
visualizeEndTimestamp) {
String requestParameterString = "/get_alert_visualize?detectorName=" +
detector + "&groupField=" + group +
"&user=" + user + "&field=" + field + "&measureField=" +
measure + "&schemaName=" + schema +
"&dimensionField=" + dimension + "&visualizeStartTimestamp=" +
visualizeStartTimestamp +
"&visualizeEndTimestamp=" + visualizeEndTimestamp;
return requestParameterString;
}
@Override
public int hashCode() {
int result;
long temp;
result = schema != null ? schema.hashCode() : 0;
result = 31 * result + (detector != null ? detector.hashCode() : 0);
result = 31 * result + (measure != null ? measure.hashCode() : 0);
result = 31 * result + (field != null ? field.hashCode() : 0);
result = 31 * result + (dimension != null ? dimension.hashCode() : 0);
result = 31 * result + (description != null ? description.hashCode() :
0);
result = 31 * result + Arrays.hashCode(icons);
result = 31 * result + (int) (startTimestamp ^ (startTimestamp >>> 32));
result = 31 * result + (int) (endTimestamp ^ (endTimestamp >>> 32));
temp = Double.doubleToLongBits(count);
result = 31 * result + (int) (temp ^ (temp >>> 32));
result = 31 * result + anomalyScore;
result = 31 * result + adHashCode;
result = 31 * result + (timeMap != null ? timeMap.hashCode() : 0);
result = 31 * result + (user != null ? user.hashCode() : 0);
result = 31 * result + (logQuery != null ? logQuery.hashCode() : 0);
result = 31 * result + (group != null ? group.hashCode() : 0);
return result;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Behavior behavior = (Behavior) o;
if (startTimestamp != behavior.startTimestamp) return false;
if (endTimestamp != behavior.endTimestamp) return false;
if (Double.compare(behavior.count, count) != 0) return false;
if (anomalyScore != behavior.anomalyScore) return false;
if (adHashCode != behavior.adHashCode) return false;
if (schema != null ? !schema.equals(behavior.schema) : behavior.schema
!= null)
return false;
if (detector != null ? !detector.equals(behavior.detector) :
behavior.detector != null)
return false;
if (measure != null ? !measure.equals(behavior.measure) :
behavior.measure != null)
return false;
if (field != null ? !field.equals(behavior.field) : behavior.field !=
null) return false;
if (dimension != null ? !dimension.equals(behavior.dimension) :
behavior.dimension != null)
return false;
if (description != null ? !description.equals(behavior.description) :
behavior.description != null)
return false;
// Probably incorrect - comparing Object[] arrays with Arrays.equals
if (!Arrays.equals(icons, behavior.icons)) return false;
if (timeMap != null ? !timeMap.equals(behavior.timeMap) :
behavior.timeMap != null)
return false;
if (user != null ? !user.equals(behavior.user) : behavior.user != null)
return false;
if (logQuery != null ? !logQuery.equals(behavior.logQuery) :
behavior.logQuery != null)
return false;
return group != null ? group.equals(behavior.group) : behavior.group ==
null;
}
@Override
public String toString() {
return "Behavior{" +
"schema='" + schema + '\'' +
", detector='" + detector + '\'' +
", measure='" + measure + '\'' +
", field='" + field + '\'' +
", dimension='" + dimension + '\'' +
", description='" + description + '\'' +
", icons=" + Arrays.toString(icons) +
", startTimestamp=" + startTimestamp +
", endTimestamp=" + endTimestamp +
", count=" + count +
", anomalyScore=" + anomalyScore +
", adHashCode=" + adHashCode +
", timeMap=" + timeMap +
", user='" + user + '\'' +
", logQuery=" + logQuery +
", group='" + group + '\'' +
'}';
}
}
{code}
was:
Here is my exception log
```
java.lang.RuntimeException: Exception occurred while processing valve output
watermark:
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:291)
at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:189)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error while adding data to RocksDB
at
org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:103)
at
org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.updateNFA(AbstractKeyedCEPPatternOperator.java:309)
at
org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:247)
at
org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:277)
at
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:886)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:288)
... 7 more
Caused by: java.lang.IllegalStateException: Could not find id for entry:
SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', field='activity',
dimension='Logoff', description='null', icons=null,
startTimestamp=1465297200000, endTimestamp=1465297203600, count=11.0,
anomalyScore=100, adHashCode=-1866791453, timeMap={1465297200000=11.0},
user='LMR0049', logQuery=null, group='null'}, 1465300799999, 0),
[SharedBufferEdge(null, 199)], 1)
at
org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
at
org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:943)
at
org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:806)
at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:888)
at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:820)
at
org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:100)
... 13 more
```
Main job code:
{code:java}
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
env.setStateBackend(new
RocksDBStateBackend(getString("flink.backend-state-dir")));
// .........
DataStream<Behavior> behaviorStream = anomalyStream
.assignTimestampsAndWatermarks(new
AnomalyTimestampExtractor(Time.seconds(0)))
.keyBy((KeySelector<AnomalySlice, String>) value ->
value.entity)
.window(SlidingEventTimeWindows.of(Time.seconds(getLong("flink.window.window-size")),
Time.seconds(getLong("flink.window.slice-size"))))
.apply(new BehaviorBuilderFunction())
.filter(new WhitelistFilterFunction())
// non-keyed stream will result in pattern operator parallelism
equal to 1.
.keyBy((KeySelector<Behavior, String>) Behavior::getUser);
// cep on behavior stream
List<Pattern> allPatterns = PatternsHolder.getAllPatterns();
for (Pattern pa : allPatterns) {
PatternStream<Behavior> ps = CEP.pattern(behaviorStream, pa);
ps.select(new AlertGenerator(pa.getName())).name(pa.getName());
}
{code}
keyed stream event:
{code:java}
public class Behavior implements Serializable {
private static final long serialVersionUID = 7786674623147772721L;
static int ANOMALY_SCORE_THRESHOLD = 40;
static int ANOMALY_COUNT_THRESHOLD = 3;
public final String schema;
public final String detector;
private String measure = UEBAConstants.DEFAULT_MEASURE_FIELD;
public final String dimension;
public final String field; //dim value
private String user;
public String group;
public double count;
public int anomalyScore;
protected String description;
private Icon[] icons;
private int adHashCode;
private long startTimestamp;
private long endTimestamp;
private Map<Long, Double> timeMap;
public ArrayList<HashMap<String, Object>> logQuery;
public Behavior(String schema, String detector, String field, String
dimension, String user,
long fromMillis, long toMillis, double count, int
anomalyScore, ArrayList<HashMap<String,
Object>> logQuery) {
this.schema = schema;
this.detector = detector;
this.field = field;
this.dimension = dimension;
this.user = user;
this.startTimestamp = fromMillis;
this.endTimestamp = toMillis;
this.count = count;
this.anomalyScore = anomalyScore;
this.logQuery = logQuery;
timeMap = new HashMap<>();
timeMap.put(fromMillis, count);
}
public Behavior(String schema, String detector, String field, String
dimension,
long fromMillis, long toMillis, double count, int
anomalyScore) {
this.schema = schema;
this.detector = detector;
this.field = field;
this.dimension = dimension;
this.startTimestamp = fromMillis;
this.endTimestamp = toMillis;
this.count = count;
this.anomalyScore = anomalyScore;
timeMap = new HashMap<>();
timeMap.put(fromMillis, count);
}
public String getGroup() {
return group;
}
public void setGroup(String group) {
this.group = group;
}
public void setAdHashCode(int hashCode) {
this.adHashCode = hashCode;
}
public void setMeasure(String measure) {
this.measure = measure;
}
public String getMeasure() {
return measure;
}
// anomalyScore is using weighted average, may not be wise.
public void add(long fromMillis, long toMillis, double count, int
anomalyScore, ArrayList<HashMap<String,
Object>> logQuery) {
double sum = this.count * this.anomalyScore + count * anomalyScore;
this.count += count;
this.anomalyScore = (int) (sum / this.count);
if (fromMillis < this.startTimestamp) {
this.startTimestamp = fromMillis;
}
if (toMillis > this.endTimestamp) {
this.endTimestamp = toMillis;
}
if (!timeMap.containsKey(fromMillis)) {
timeMap.put(fromMillis, 0.0);
}
timeMap.put(fromMillis, timeMap.get(fromMillis) + count);
if (logQuery != null) {
this.logQuery.addAll(logQuery);
}
}
public void add(long fromMillis, long toMillis, double count, int
anomalyScore) {
double sum = this.count * this.anomalyScore + count * anomalyScore;
this.count += count;
this.anomalyScore = (int) (sum / this.count);
if (fromMillis < this.startTimestamp) {
this.startTimestamp = fromMillis;
}
if (toMillis > this.endTimestamp) {
this.endTimestamp = toMillis;
}
if (!timeMap.containsKey(fromMillis)) {
timeMap.put(fromMillis, 0.0);
}
timeMap.put(fromMillis, timeMap.get(fromMillis) + count);
}
public Long[] getTimestamps() {
return timeMap.keySet().toArray(new Long[timeMap.size()]);
}
public String dimension() {
return dimension;
}
public long startTimestamp() {
return startTimestamp;
}
public long endTimestamp() {
return endTimestamp;
}
public double count() {
return count;
}
public int anomalyScore() {
return anomalyScore;
}
public boolean isAnomaly() {
return anomalyScore() >= ANOMALY_SCORE_THRESHOLD && count() >=
ANOMALY_COUNT_THRESHOLD;
}
public String getUser() {
return user;
}
public void setUser(String user) {
this.user = user;
}
public void describeAs(String description, Icon... icons) {
this.description = description;
this.icons = icons;
}
public String setVisualizeInterfaceParameter(String group, long
visualizeStartTimestamp, long
visualizeEndTimestamp) {
String requestParameterString = "/get_alert_visualize?detectorName=" +
detector + "&groupField=" + group +
"&user=" + user + "&field=" + field + "&measureField=" +
measure + "&schemaName=" + schema +
"&dimensionField=" + dimension + "&visualizeStartTimestamp=" +
visualizeStartTimestamp +
"&visualizeEndTimestamp=" + visualizeEndTimestamp;
return requestParameterString;
}
@Override
public int hashCode() {
int result;
long temp;
result = schema != null ? schema.hashCode() : 0;
result = 31 * result + (detector != null ? detector.hashCode() : 0);
result = 31 * result + (measure != null ? measure.hashCode() : 0);
result = 31 * result + (field != null ? field.hashCode() : 0);
result = 31 * result + (dimension != null ? dimension.hashCode() : 0);
result = 31 * result + (description != null ? description.hashCode() :
0);
result = 31 * result + Arrays.hashCode(icons);
result = 31 * result + (int) (startTimestamp ^ (startTimestamp >>> 32));
result = 31 * result + (int) (endTimestamp ^ (endTimestamp >>> 32));
temp = Double.doubleToLongBits(count);
result = 31 * result + (int) (temp ^ (temp >>> 32));
result = 31 * result + anomalyScore;
result = 31 * result + adHashCode;
result = 31 * result + (timeMap != null ? timeMap.hashCode() : 0);
result = 31 * result + (user != null ? user.hashCode() : 0);
result = 31 * result + (logQuery != null ? logQuery.hashCode() : 0);
result = 31 * result + (group != null ? group.hashCode() : 0);
return result;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Behavior behavior = (Behavior) o;
if (startTimestamp != behavior.startTimestamp) return false;
if (endTimestamp != behavior.endTimestamp) return false;
if (Double.compare(behavior.count, count) != 0) return false;
if (anomalyScore != behavior.anomalyScore) return false;
if (adHashCode != behavior.adHashCode) return false;
if (schema != null ? !schema.equals(behavior.schema) : behavior.schema
!= null)
return false;
if (detector != null ? !detector.equals(behavior.detector) :
behavior.detector != null)
return false;
if (measure != null ? !measure.equals(behavior.measure) :
behavior.measure != null)
return false;
if (field != null ? !field.equals(behavior.field) : behavior.field !=
null) return false;
if (dimension != null ? !dimension.equals(behavior.dimension) :
behavior.dimension != null)
return false;
if (description != null ? !description.equals(behavior.description) :
behavior.description != null)
return false;
// Probably incorrect - comparing Object[] arrays with Arrays.equals
if (!Arrays.equals(icons, behavior.icons)) return false;
if (timeMap != null ? !timeMap.equals(behavior.timeMap) :
behavior.timeMap != null)
return false;
if (user != null ? !user.equals(behavior.user) : behavior.user != null)
return false;
if (logQuery != null ? !logQuery.equals(behavior.logQuery) :
behavior.logQuery != null)
return false;
return group != null ? group.equals(behavior.group) : behavior.group ==
null;
}
@Override
public String toString() {
return "Behavior{" +
"schema='" + schema + '\'' +
", detector='" + detector + '\'' +
", measure='" + measure + '\'' +
", field='" + field + '\'' +
", dimension='" + dimension + '\'' +
", description='" + description + '\'' +
", icons=" + Arrays.toString(icons) +
", startTimestamp=" + startTimestamp +
", endTimestamp=" + endTimestamp +
", count=" + count +
", anomalyScore=" + anomalyScore +
", adHashCode=" + adHashCode +
", timeMap=" + timeMap +
", user='" + user + '\'' +
", logQuery=" + logQuery +
", group='" + group + '\'' +
'}';
}
}
{code}
> RocksDB state backend Checkpointing is not working with KeyedCEP in 1.4
> -----------------------------------------------------------------------
>
> Key: FLINK-8248
> URL: https://issues.apache.org/jira/browse/FLINK-8248
> Project: Flink
> Issue Type: Bug
> Components: CEP, State Backends, Checkpointing
> Affects Versions: 1.4.0, 1.3.2
> Environment: linux: 3.10.0-514.el7.x86_64
> flink:
> * version: 1.4
> * rocksdb backend state
> * checkpoint interval 5s
> * keyed cep
> language: Java8
> Reporter: jia liu
>
> {panel:title=Here is my exception log}
> java.lang.RuntimeException: Exception occurred while processing valve output
> watermark:
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:291)
> at
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
> at
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:189)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Error while adding data to RocksDB
> at
> org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:103)
> at
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.updateNFA(AbstractKeyedCEPPatternOperator.java:309)
> at
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:247)
> at
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:277)
> at
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:886)
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:288)
> ... 7 more
> Caused by: java.lang.IllegalStateException: Could not find id for entry:
> SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
> detector='SlidingWindowAnomalyDetector', measure='count', field='activity',
> dimension='Logoff', description='null', icons=null,
> startTimestamp=1465297200000, endTimestamp=1465297203600, count=11.0,
> anomalyScore=100, adHashCode=-1866791453, timeMap={1465297200000=11.0},
> user='LMR0049', logQuery=null, group='null'}, 1465300799999, 0),
> [SharedBufferEdge(null, 199)], 1)
> at
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
> at
> org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:943)
> at
> org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:806)
> at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:888)
> at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:820)
> at
> org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:100)
> ... 13 more
> {panel}
> Main job code:
> {code:java}
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
> env.setStateBackend(new
> RocksDBStateBackend(getString("flink.backend-state-dir")));
> // .........
> DataStream<Behavior> behaviorStream = anomalyStream
> .assignTimestampsAndWatermarks(new
> AnomalyTimestampExtractor(Time.seconds(0)))
> .keyBy((KeySelector<AnomalySlice, String>) value ->
> value.entity)
>
> .window(SlidingEventTimeWindows.of(Time.seconds(getLong("flink.window.window-size")),
> Time.seconds(getLong("flink.window.slice-size"))))
> .apply(new BehaviorBuilderFunction())
> .filter(new WhitelistFilterFunction())
> // non-keyed stream will result in pattern operator
> parallelism equal to 1.
> .keyBy((KeySelector<Behavior, String>) Behavior::getUser);
> // cep on behavior stream
> List<Pattern> allPatterns = PatternsHolder.getAllPatterns();
> for (Pattern pa : allPatterns) {
> PatternStream<Behavior> ps = CEP.pattern(behaviorStream, pa);
> ps.select(new AlertGenerator(pa.getName())).name(pa.getName());
> }
> {code}
> keyed stream event:
> {code:java}
> public class Behavior implements Serializable {
> private static final long serialVersionUID = 7786674623147772721L;
> static int ANOMALY_SCORE_THRESHOLD = 40;
> static int ANOMALY_COUNT_THRESHOLD = 3;
> public final String schema;
> public final String detector;
> private String measure = UEBAConstants.DEFAULT_MEASURE_FIELD;
> public final String dimension;
> public final String field; //dim value
> private String user;
> public String group;
> public double count;
> public int anomalyScore;
> protected String description;
> private Icon[] icons;
> private int adHashCode;
> private long startTimestamp;
> private long endTimestamp;
> private Map<Long, Double> timeMap;
> public ArrayList<HashMap<String, Object>> logQuery;
> public Behavior(String schema, String detector, String field, String
> dimension, String user,
> long fromMillis, long toMillis, double count, int
> anomalyScore, ArrayList<HashMap<String,
> Object>> logQuery) {
> this.schema = schema;
> this.detector = detector;
> this.field = field;
> this.dimension = dimension;
> this.user = user;
> this.startTimestamp = fromMillis;
> this.endTimestamp = toMillis;
> this.count = count;
> this.anomalyScore = anomalyScore;
> this.logQuery = logQuery;
> timeMap = new HashMap<>();
> timeMap.put(fromMillis, count);
> }
> public Behavior(String schema, String detector, String field, String
> dimension,
> long fromMillis, long toMillis, double count, int
> anomalyScore) {
> this.schema = schema;
> this.detector = detector;
> this.field = field;
> this.dimension = dimension;
> this.startTimestamp = fromMillis;
> this.endTimestamp = toMillis;
> this.count = count;
> this.anomalyScore = anomalyScore;
> timeMap = new HashMap<>();
> timeMap.put(fromMillis, count);
> }
> public String getGroup() {
> return group;
> }
> public void setGroup(String group) {
> this.group = group;
> }
> public void setAdHashCode(int hashCode) {
> this.adHashCode = hashCode;
> }
> public void setMeasure(String measure) {
> this.measure = measure;
> }
> public String getMeasure() {
> return measure;
> }
> // anomalyScore is using weighted average, may not be wise.
> public void add(long fromMillis, long toMillis, double count, int
> anomalyScore, ArrayList<HashMap<String,
> Object>> logQuery) {
> double sum = this.count * this.anomalyScore + count * anomalyScore;
> this.count += count;
> this.anomalyScore = (int) (sum / this.count);
> if (fromMillis < this.startTimestamp) {
> this.startTimestamp = fromMillis;
> }
> if (toMillis > this.endTimestamp) {
> this.endTimestamp = toMillis;
> }
> if (!timeMap.containsKey(fromMillis)) {
> timeMap.put(fromMillis, 0.0);
> }
> timeMap.put(fromMillis, timeMap.get(fromMillis) + count);
> if (logQuery != null) {
> this.logQuery.addAll(logQuery);
> }
> }
> public void add(long fromMillis, long toMillis, double count, int
> anomalyScore) {
> double sum = this.count * this.anomalyScore + count * anomalyScore;
> this.count += count;
> this.anomalyScore = (int) (sum / this.count);
> if (fromMillis < this.startTimestamp) {
> this.startTimestamp = fromMillis;
> }
> if (toMillis > this.endTimestamp) {
> this.endTimestamp = toMillis;
> }
> if (!timeMap.containsKey(fromMillis)) {
> timeMap.put(fromMillis, 0.0);
> }
> timeMap.put(fromMillis, timeMap.get(fromMillis) + count);
> }
> public Long[] getTimestamps() {
> return timeMap.keySet().toArray(new Long[timeMap.size()]);
> }
> public String dimension() {
> return dimension;
> }
> public long startTimestamp() {
> return startTimestamp;
> }
> public long endTimestamp() {
> return endTimestamp;
> }
> public double count() {
> return count;
> }
> public int anomalyScore() {
> return anomalyScore;
> }
> public boolean isAnomaly() {
> return anomalyScore() >= ANOMALY_SCORE_THRESHOLD && count() >=
> ANOMALY_COUNT_THRESHOLD;
> }
> public String getUser() {
> return user;
> }
> public void setUser(String user) {
> this.user = user;
> }
> public void describeAs(String description, Icon... icons) {
> this.description = description;
> this.icons = icons;
> }
> public String setVisualizeInterfaceParameter(String group, long
> visualizeStartTimestamp, long
> visualizeEndTimestamp) {
> String requestParameterString = "/get_alert_visualize?detectorName="
> + detector + "&groupField=" + group +
> "&user=" + user + "&field=" + field + "&measureField=" +
> measure + "&schemaName=" + schema +
> "&dimensionField=" + dimension + "&visualizeStartTimestamp="
> + visualizeStartTimestamp +
> "&visualizeEndTimestamp=" + visualizeEndTimestamp;
> return requestParameterString;
> }
> @Override
> public int hashCode() {
> int result;
> long temp;
> result = schema != null ? schema.hashCode() : 0;
> result = 31 * result + (detector != null ? detector.hashCode() : 0);
> result = 31 * result + (measure != null ? measure.hashCode() : 0);
> result = 31 * result + (field != null ? field.hashCode() : 0);
> result = 31 * result + (dimension != null ? dimension.hashCode() : 0);
> result = 31 * result + (description != null ? description.hashCode()
> : 0);
> result = 31 * result + Arrays.hashCode(icons);
> result = 31 * result + (int) (startTimestamp ^ (startTimestamp >>>
> 32));
> result = 31 * result + (int) (endTimestamp ^ (endTimestamp >>> 32));
> temp = Double.doubleToLongBits(count);
> result = 31 * result + (int) (temp ^ (temp >>> 32));
> result = 31 * result + anomalyScore;
> result = 31 * result + adHashCode;
> result = 31 * result + (timeMap != null ? timeMap.hashCode() : 0);
> result = 31 * result + (user != null ? user.hashCode() : 0);
> result = 31 * result + (logQuery != null ? logQuery.hashCode() : 0);
> result = 31 * result + (group != null ? group.hashCode() : 0);
> return result;
> }
> @Override
> public boolean equals(Object o) {
> if (this == o) return true;
> if (o == null || getClass() != o.getClass()) return false;
> Behavior behavior = (Behavior) o;
> if (startTimestamp != behavior.startTimestamp) return false;
> if (endTimestamp != behavior.endTimestamp) return false;
> if (Double.compare(behavior.count, count) != 0) return false;
> if (anomalyScore != behavior.anomalyScore) return false;
> if (adHashCode != behavior.adHashCode) return false;
> if (schema != null ? !schema.equals(behavior.schema) :
> behavior.schema != null)
> return false;
> if (detector != null ? !detector.equals(behavior.detector) :
> behavior.detector != null)
> return false;
> if (measure != null ? !measure.equals(behavior.measure) :
> behavior.measure != null)
> return false;
> if (field != null ? !field.equals(behavior.field) : behavior.field !=
> null) return false;
> if (dimension != null ? !dimension.equals(behavior.dimension) :
> behavior.dimension != null)
> return false;
> if (description != null ? !description.equals(behavior.description) :
> behavior.description != null)
> return false;
> // Probably incorrect - comparing Object[] arrays with Arrays.equals
> if (!Arrays.equals(icons, behavior.icons)) return false;
> if (timeMap != null ? !timeMap.equals(behavior.timeMap) :
> behavior.timeMap != null)
> return false;
> if (user != null ? !user.equals(behavior.user) : behavior.user !=
> null) return false;
> if (logQuery != null ? !logQuery.equals(behavior.logQuery) :
> behavior.logQuery != null)
> return false;
> return group != null ? group.equals(behavior.group) : behavior.group
> == null;
> }
> @Override
> public String toString() {
> return "Behavior{" +
> "schema='" + schema + '\'' +
> ", detector='" + detector + '\'' +
> ", measure='" + measure + '\'' +
> ", field='" + field + '\'' +
> ", dimension='" + dimension + '\'' +
> ", description='" + description + '\'' +
> ", icons=" + Arrays.toString(icons) +
> ", startTimestamp=" + startTimestamp +
> ", endTimestamp=" + endTimestamp +
> ", count=" + count +
> ", anomalyScore=" + anomalyScore +
> ", adHashCode=" + adHashCode +
> ", timeMap=" + timeMap +
> ", user='" + user + '\'' +
> ", logQuery=" + logQuery +
> ", group='" + group + '\'' +
> '}';
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)