[
https://issues.apache.org/jira/browse/FLINK-8248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16296123#comment-16296123
]
jia liu edited comment on FLINK-8248 at 12/19/17 3:29 AM:
----------------------------------------------------------
It happened again, I've read the source code. Found that may be a bug at
*org.apache.flink.cep.nfa.SharedBufferSerializer.serialize()*.
It did not put the *SharedBufferEntry* related *edges* into *entryIDs* in the
first loop, But query from *entryIDs* in the second loop at line 942.
{code:java}
@Override
public void serialize(SharedBuffer<K, V> record, DataOutputView
target) throws IOException {
Map<K, SharedBufferPage<K, V>> pages = record.pages;
Map<SharedBufferEntry<K, V>, Integer> entryIDs = new
HashMap<>();
int totalEdges = 0;
int entryCounter = 0;
// number of pages
target.writeInt(pages.size());
for (Map.Entry<K, SharedBufferPage<K, V>> pageEntry:
pages.entrySet()) {
SharedBufferPage<K, V> page =
pageEntry.getValue();
// key for the current page
keySerializer.serialize(page.getKey(), target);
// number of page entries
target.writeInt(page.entries.size());
for (Map.Entry<ValueTimeWrapper<V>,
SharedBufferEntry<K, V>> sharedBufferEntry: page.entries.entrySet()) {
SharedBufferEntry<K, V> sharedBuffer =
sharedBufferEntry.getValue();
// assign id to the sharedBufferEntry
for the future
// serialization of the previous
relation
entryIDs.put(sharedBuffer,
entryCounter++);
ValueTimeWrapper<V> valueTimeWrapper =
sharedBuffer.getValueTime();
valueSerializer.serialize(valueTimeWrapper.getValue(), target);
target.writeLong(valueTimeWrapper.getTimestamp());
target.writeInt(valueTimeWrapper.getCounter());
int edges = sharedBuffer.edges.size();
totalEdges += edges;
target.writeInt(sharedBuffer.referenceCounter);
}
}
// write the edges between the shared buffer entries
target.writeInt(totalEdges);
for (Map.Entry<K, SharedBufferPage<K, V>> pageEntry:
pages.entrySet()) {
SharedBufferPage<K, V> page =
pageEntry.getValue();
for (Map.Entry<ValueTimeWrapper<V>,
SharedBufferEntry<K, V>> sharedBufferEntry: page.entries.entrySet()) {
SharedBufferEntry<K, V> sharedBuffer =
sharedBufferEntry.getValue();
Integer id = entryIDs.get(sharedBuffer);
Preconditions.checkState(id != null,
"Could not find id for entry: " + sharedBuffer + "pages: " + pages + "entryIDs:
" + entryIDs);
for (SharedBufferEdge<K, V> edge:
sharedBuffer.edges) {
// in order to serialize the
previous relation we simply serialize the ids
// of the source and target
SharedBufferEntry
if (edge.target != null) {
Integer targetId =
entryIDs.get(edge.getTarget());
Preconditions.checkState(targetId != null,
"Could
not find id for entry: " + edge.getTarget() + "pages: " + pages + "entryIDs: "
+ entryIDs);
target.writeInt(id);
target.writeInt(targetId);
versionSerializer.serialize(edge.version, target);
} else {
target.writeInt(id);
target.writeInt(-1);
versionSerializer.serialize(edge.version, target);
}
}
}
}
}
{code}
data example :
{noformat}
SharedBufferEntry(ValueTimeWrapper(Behavior{schema='device',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Connect',
field='activity', user='RHR0856', count=23.0, anomalyScore=100,
startTimestamp=1511395200000, endTimestamp=1511395203600}, 1511409599999, 0),
[SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logoff',
field='activity', user='RHR0856', count=12.0, anomalyScore=100,
startTimestamp=1511344800000, endTimestamp=1511344803600}, 1511366399999, 0),
[SharedBufferEdge(null, 16)], 1), 16.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logon',
field='activity', user='RHR0856', count=22.0, anomalyScore=100,
startTimestamp=1511330400000, endTimestamp=1511330403600}, 1511387999999, 0),
[SharedBufferEdge(null, 27)], 1), 27.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logoff',
field='activity', user='RHR0856', count=18.0, anomalyScore=100,
startTimestamp=1511344800000, endTimestamp=1511344803600}, 1511402399999, 0),
[SharedBufferEdge(null, 36)], 1), 36.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logon',
field='activity', user='RHR0856', count=16.0, anomalyScore=100,
startTimestamp=1511330400000, endTimestamp=1511330403600}, 1511337599999, 0),
[SharedBufferEdge(null, 2)], 1), 2.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logoff',
field='activity', user='RHR0856', count=12.0, anomalyScore=100,
startTimestamp=1511344800000, endTimestamp=1511344803600}, 1511355599999, 0),
[SharedBufferEdge(null, 10)], 1), 10.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logoff',
field='activity', user='RHR0856', count=12.0, anomalyScore=100,
startTimestamp=1511344800000, endTimestamp=1511344803600}, 1511359199999, 0),
[SharedBufferEdge(null, 12)], 1), 12.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logon',
field='activity', user='RHR0856', count=22.0, anomalyScore=100,
startTimestamp=1511330400000, endTimestamp=1511330403600}, 1511395199999, 0),
[SharedBufferEdge(null, 31)], 1), 31.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logon',
field='activity', user='RHR0856', count=16.0, anomalyScore=100,
startTimestamp=1511330400000, endTimestamp=1511330403600}, 1511377199999, 0),
[SharedBufferEdge(null, 21)], 1), 21.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logon',
field='activity', user='RHR0856', count=16.0, anomalyScore=100,
startTimestamp=1511330400000, endTimestamp=1511330403600}, 1511351999999, 0),
[SharedBufferEdge(null, 7)], 1), 7.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logon',
field='activity', user='RHR0856', count=16.0, anomalyScore=100,
startTimestamp=1511330400000, endTimestamp=1511330403600}, 1511359199999, 0),
[SharedBufferEdge(null, 11)], 1), 11.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logon',
field='activity', user='RHR0856', count=16.0, anomalyScore=100,
startTimestamp=1511330400000, endTimestamp=1511330403600}, 1511380799999, 0),
[SharedBufferEdge(null, 23)], 1), 23.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logoff',
field='activity', user='RHR0856', count=18.0, anomalyScore=100,
startTimestamp=1511344800000, endTimestamp=1511344803600}, 1511391599999, 0),
[SharedBufferEdge(null, 30)], 1), 30.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logon',
field='activity', user='RHR0856', count=16.0, anomalyScore=100,
startTimestamp=1511330400000, endTimestamp=1511330403600}, 1511366399999, 0),
[SharedBufferEdge(null, 15)], 1), 15.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logoff',
field='activity', user='RHR0856', count=12.0, anomalyScore=100,
startTimestamp=1511344800000, endTimestamp=1511344803600}, 1511348399999, 0),
[SharedBufferEdge(null, 6)], 1), 6.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logon',
field='activity', user='RHR0856', count=16.0, anomalyScore=100,
startTimestamp=1511330400000, endTimestamp=1511330403600}, 1511373599999, 0),
[SharedBufferEdge(null, 19)], 1), 19.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logoff',
field='activity', user='RHR0856', count=12.0, anomalyScore=100,
startTimestamp=1511344800000, endTimestamp=1511344803600}, 1511362799999, 0),
[SharedBufferEdge(null, 14)], 1), 14.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logoff',
field='activity', user='RHR0856', count=12.0, anomalyScore=100,
startTimestamp=1511344800000, endTimestamp=1511344803600}, 1511380799999, 0),
[SharedBufferEdge(null, 24)], 1), 24.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logoff',
field='activity', user='RHR0856', count=18.0, anomalyScore=100,
startTimestamp=1511344800000, endTimestamp=1511344803600}, 1511395199999, 0),
[SharedBufferEdge(null, 32)], 1), 32.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logon',
field='activity', user='RHR0856', count=16.0, anomalyScore=100,
startTimestamp=1511330400000, endTimestamp=1511330403600}, 1511348399999, 0),
[SharedBufferEdge(null, 5)], 1), 5.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logoff',
field='activity', user='RHR0856', count=12.0, anomalyScore=100,
startTimestamp=1511344800000, endTimestamp=1511344803600}, 1511373599999, 0),
[SharedBufferEdge(null, 20)], 1), 20.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logoff',
field='activity', user='RHR0856', count=18.0, anomalyScore=100,
startTimestamp=1511344800000, endTimestamp=1511344803600}, 1511387999999, 0),
[SharedBufferEdge(null, 28)], 1), 28.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logon',
field='activity', user='RHR0856', count=16.0, anomalyScore=100,
startTimestamp=1511330400000, endTimestamp=1511330403600}, 1511341199999, 0),
[SharedBufferEdge(null, 3)], 1), 3.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logoff',
field='activity', user='RHR0856', count=12.0, anomalyScore=100,
startTimestamp=1511344800000, endTimestamp=1511344803600}, 1511377199999, 0),
[SharedBufferEdge(null, 22)], 1), 22.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logon',
field='activity', user='RHR0856', count=22.0, anomalyScore=100,
startTimestamp=1511330400000, endTimestamp=1511330403600}, 1511398799999, 0),
[SharedBufferEdge(null, 33)], 1), 33.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logoff',
field='activity', user='RHR0856', count=12.0, anomalyScore=100,
startTimestamp=1511344800000, endTimestamp=1511344803600}, 1511351999999, 0),
[SharedBufferEdge(null, 8)], 1), 8.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logon',
field='activity', user='RHR0856', count=16.0, anomalyScore=100,
startTimestamp=1511330400000, endTimestamp=1511330403600}, 1511362799999, 0),
[SharedBufferEdge(null, 13)], 1), 13.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logon',
field='activity', user='RHR0856', count=22.0, anomalyScore=100,
startTimestamp=1511330400000, endTimestamp=1511330403600}, 1511402399999, 0),
[SharedBufferEdge(null, 35)], 1), 35.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logon',
field='activity', user='RHR0856', count=16.0, anomalyScore=100,
startTimestamp=1511330400000, endTimestamp=1511330403600}, 1511355599999, 0),
[SharedBufferEdge(null, 9)], 1), 9.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logon',
field='activity', user='RHR0856', count=16.0, anomalyScore=100,
startTimestamp=1511330400000, endTimestamp=1511330403600}, 1511344799999, 0),
[SharedBufferEdge(null, 4)], 1), 4.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logon',
field='activity', user='RHR0856', count=16.0, anomalyScore=100,
startTimestamp=1511330400000, endTimestamp=1511330403600}, 1511333999999, 0),
[SharedBufferEdge(null, 1)], 1), 1.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logoff',
field='activity', user='RHR0856', count=12.0, anomalyScore=100,
startTimestamp=1511344800000, endTimestamp=1511344803600}, 1511369999999, 0),
[SharedBufferEdge(null, 18)], 1), 18.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logon',
field='activity', user='RHR0856', count=16.0, anomalyScore=100,
startTimestamp=1511330400000, endTimestamp=1511330403600}, 1511369999999, 0),
[SharedBufferEdge(null, 17)], 1), 17.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logon',
field='activity', user='RHR0856', count=22.0, anomalyScore=100,
startTimestamp=1511330400000, endTimestamp=1511330403600}, 1511391599999, 0),
[SharedBufferEdge(null, 29)], 1), 29.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logoff',
field='activity', user='RHR0856', count=18.0, anomalyScore=100,
startTimestamp=1511344800000, endTimestamp=1511344803600}, 1511405999999, 0),
[SharedBufferEdge(null, 38)], 1), 38.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logoff',
field='activity', user='RHR0856', count=12.0, anomalyScore=100,
startTimestamp=1511344800000, endTimestamp=1511344803600}, 1511384399999, 0),
[SharedBufferEdge(null, 26)], 1), 26.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logoff',
field='activity', user='RHR0856', count=18.0, anomalyScore=100,
startTimestamp=1511344800000, endTimestamp=1511344803600}, 1511398799999, 0),
[SharedBufferEdge(null, 34)], 1), 34.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logon',
field='activity', user='RHR0856', count=16.0, anomalyScore=100,
startTimestamp=1511330400000, endTimestamp=1511330403600}, 1511384399999, 0),
[SharedBufferEdge(null, 25)], 1), 25.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logon',
field='activity', user='RHR0856', count=22.0, anomalyScore=100,
startTimestamp=1511330400000, endTimestamp=1511330403600}, 1511405999999, 0),
[SharedBufferEdge(null, 37)], 1), 37.0)], 10)=2
{noformat}
was (Author: sonice_lj):
It happened again, I've read the source code. Found that may be a bug at
*org.apache.flink.cep.nfa.SharedBufferSerializer.serialize()*.
It did not put the *SharedBufferEntry* related *edges* into *entryIDs* in the
first loop, But query from *entryIDs* at line 942.
{code:java}
@Override
public void serialize(SharedBuffer<K, V> record, DataOutputView
target) throws IOException {
Map<K, SharedBufferPage<K, V>> pages = record.pages;
Map<SharedBufferEntry<K, V>, Integer> entryIDs = new
HashMap<>();
int totalEdges = 0;
int entryCounter = 0;
// number of pages
target.writeInt(pages.size());
for (Map.Entry<K, SharedBufferPage<K, V>> pageEntry:
pages.entrySet()) {
SharedBufferPage<K, V> page =
pageEntry.getValue();
// key for the current page
keySerializer.serialize(page.getKey(), target);
// number of page entries
target.writeInt(page.entries.size());
for (Map.Entry<ValueTimeWrapper<V>,
SharedBufferEntry<K, V>> sharedBufferEntry: page.entries.entrySet()) {
SharedBufferEntry<K, V> sharedBuffer =
sharedBufferEntry.getValue();
// assign id to the sharedBufferEntry
for the future
// serialization of the previous
relation
entryIDs.put(sharedBuffer,
entryCounter++);
ValueTimeWrapper<V> valueTimeWrapper =
sharedBuffer.getValueTime();
valueSerializer.serialize(valueTimeWrapper.getValue(), target);
target.writeLong(valueTimeWrapper.getTimestamp());
target.writeInt(valueTimeWrapper.getCounter());
int edges = sharedBuffer.edges.size();
totalEdges += edges;
target.writeInt(sharedBuffer.referenceCounter);
}
}
// write the edges between the shared buffer entries
target.writeInt(totalEdges);
for (Map.Entry<K, SharedBufferPage<K, V>> pageEntry:
pages.entrySet()) {
SharedBufferPage<K, V> page =
pageEntry.getValue();
for (Map.Entry<ValueTimeWrapper<V>,
SharedBufferEntry<K, V>> sharedBufferEntry: page.entries.entrySet()) {
SharedBufferEntry<K, V> sharedBuffer =
sharedBufferEntry.getValue();
Integer id = entryIDs.get(sharedBuffer);
Preconditions.checkState(id != null,
"Could not find id for entry: " + sharedBuffer + "pages: " + pages + "entryIDs:
" + entryIDs);
for (SharedBufferEdge<K, V> edge:
sharedBuffer.edges) {
// in order to serialize the
previous relation we simply serialize the ids
// of the source and target
SharedBufferEntry
if (edge.target != null) {
Integer targetId =
entryIDs.get(edge.getTarget());
Preconditions.checkState(targetId != null,
"Could
not find id for entry: " + edge.getTarget() + "pages: " + pages + "entryIDs: "
+ entryIDs);
target.writeInt(id);
target.writeInt(targetId);
versionSerializer.serialize(edge.version, target);
} else {
target.writeInt(id);
target.writeInt(-1);
versionSerializer.serialize(edge.version, target);
}
}
}
}
}
{code}
data example :
{noformat}
SharedBufferEntry(ValueTimeWrapper(Behavior{schema='device',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Connect',
field='activity', user='RHR0856', count=23.0, anomalyScore=100,
startTimestamp=1511395200000, endTimestamp=1511395203600}, 1511409599999, 0),
[SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logoff',
field='activity', user='RHR0856', count=12.0, anomalyScore=100,
startTimestamp=1511344800000, endTimestamp=1511344803600}, 1511366399999, 0),
[SharedBufferEdge(null, 16)], 1), 16.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logon',
field='activity', user='RHR0856', count=22.0, anomalyScore=100,
startTimestamp=1511330400000, endTimestamp=1511330403600}, 1511387999999, 0),
[SharedBufferEdge(null, 27)], 1), 27.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logoff',
field='activity', user='RHR0856', count=18.0, anomalyScore=100,
startTimestamp=1511344800000, endTimestamp=1511344803600}, 1511402399999, 0),
[SharedBufferEdge(null, 36)], 1), 36.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logon',
field='activity', user='RHR0856', count=16.0, anomalyScore=100,
startTimestamp=1511330400000, endTimestamp=1511330403600}, 1511337599999, 0),
[SharedBufferEdge(null, 2)], 1), 2.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logoff',
field='activity', user='RHR0856', count=12.0, anomalyScore=100,
startTimestamp=1511344800000, endTimestamp=1511344803600}, 1511355599999, 0),
[SharedBufferEdge(null, 10)], 1), 10.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logoff',
field='activity', user='RHR0856', count=12.0, anomalyScore=100,
startTimestamp=1511344800000, endTimestamp=1511344803600}, 1511359199999, 0),
[SharedBufferEdge(null, 12)], 1), 12.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logon',
field='activity', user='RHR0856', count=22.0, anomalyScore=100,
startTimestamp=1511330400000, endTimestamp=1511330403600}, 1511395199999, 0),
[SharedBufferEdge(null, 31)], 1), 31.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logon',
field='activity', user='RHR0856', count=16.0, anomalyScore=100,
startTimestamp=1511330400000, endTimestamp=1511330403600}, 1511377199999, 0),
[SharedBufferEdge(null, 21)], 1), 21.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logon',
field='activity', user='RHR0856', count=16.0, anomalyScore=100,
startTimestamp=1511330400000, endTimestamp=1511330403600}, 1511351999999, 0),
[SharedBufferEdge(null, 7)], 1), 7.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logon',
field='activity', user='RHR0856', count=16.0, anomalyScore=100,
startTimestamp=1511330400000, endTimestamp=1511330403600}, 1511359199999, 0),
[SharedBufferEdge(null, 11)], 1), 11.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logon',
field='activity', user='RHR0856', count=16.0, anomalyScore=100,
startTimestamp=1511330400000, endTimestamp=1511330403600}, 1511380799999, 0),
[SharedBufferEdge(null, 23)], 1), 23.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logoff',
field='activity', user='RHR0856', count=18.0, anomalyScore=100,
startTimestamp=1511344800000, endTimestamp=1511344803600}, 1511391599999, 0),
[SharedBufferEdge(null, 30)], 1), 30.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logon',
field='activity', user='RHR0856', count=16.0, anomalyScore=100,
startTimestamp=1511330400000, endTimestamp=1511330403600}, 1511366399999, 0),
[SharedBufferEdge(null, 15)], 1), 15.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logoff',
field='activity', user='RHR0856', count=12.0, anomalyScore=100,
startTimestamp=1511344800000, endTimestamp=1511344803600}, 1511348399999, 0),
[SharedBufferEdge(null, 6)], 1), 6.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logon',
field='activity', user='RHR0856', count=16.0, anomalyScore=100,
startTimestamp=1511330400000, endTimestamp=1511330403600}, 1511373599999, 0),
[SharedBufferEdge(null, 19)], 1), 19.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logoff',
field='activity', user='RHR0856', count=12.0, anomalyScore=100,
startTimestamp=1511344800000, endTimestamp=1511344803600}, 1511362799999, 0),
[SharedBufferEdge(null, 14)], 1), 14.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logoff',
field='activity', user='RHR0856', count=12.0, anomalyScore=100,
startTimestamp=1511344800000, endTimestamp=1511344803600}, 1511380799999, 0),
[SharedBufferEdge(null, 24)], 1), 24.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logoff',
field='activity', user='RHR0856', count=18.0, anomalyScore=100,
startTimestamp=1511344800000, endTimestamp=1511344803600}, 1511395199999, 0),
[SharedBufferEdge(null, 32)], 1), 32.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logon',
field='activity', user='RHR0856', count=16.0, anomalyScore=100,
startTimestamp=1511330400000, endTimestamp=1511330403600}, 1511348399999, 0),
[SharedBufferEdge(null, 5)], 1), 5.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logoff',
field='activity', user='RHR0856', count=12.0, anomalyScore=100,
startTimestamp=1511344800000, endTimestamp=1511344803600}, 1511373599999, 0),
[SharedBufferEdge(null, 20)], 1), 20.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logoff',
field='activity', user='RHR0856', count=18.0, anomalyScore=100,
startTimestamp=1511344800000, endTimestamp=1511344803600}, 1511387999999, 0),
[SharedBufferEdge(null, 28)], 1), 28.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logon',
field='activity', user='RHR0856', count=16.0, anomalyScore=100,
startTimestamp=1511330400000, endTimestamp=1511330403600}, 1511341199999, 0),
[SharedBufferEdge(null, 3)], 1), 3.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logoff',
field='activity', user='RHR0856', count=12.0, anomalyScore=100,
startTimestamp=1511344800000, endTimestamp=1511344803600}, 1511377199999, 0),
[SharedBufferEdge(null, 22)], 1), 22.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logon',
field='activity', user='RHR0856', count=22.0, anomalyScore=100,
startTimestamp=1511330400000, endTimestamp=1511330403600}, 1511398799999, 0),
[SharedBufferEdge(null, 33)], 1), 33.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logoff',
field='activity', user='RHR0856', count=12.0, anomalyScore=100,
startTimestamp=1511344800000, endTimestamp=1511344803600}, 1511351999999, 0),
[SharedBufferEdge(null, 8)], 1), 8.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logon',
field='activity', user='RHR0856', count=16.0, anomalyScore=100,
startTimestamp=1511330400000, endTimestamp=1511330403600}, 1511362799999, 0),
[SharedBufferEdge(null, 13)], 1), 13.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logon',
field='activity', user='RHR0856', count=22.0, anomalyScore=100,
startTimestamp=1511330400000, endTimestamp=1511330403600}, 1511402399999, 0),
[SharedBufferEdge(null, 35)], 1), 35.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logon',
field='activity', user='RHR0856', count=16.0, anomalyScore=100,
startTimestamp=1511330400000, endTimestamp=1511330403600}, 1511355599999, 0),
[SharedBufferEdge(null, 9)], 1), 9.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logon',
field='activity', user='RHR0856', count=16.0, anomalyScore=100,
startTimestamp=1511330400000, endTimestamp=1511330403600}, 1511344799999, 0),
[SharedBufferEdge(null, 4)], 1), 4.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logon',
field='activity', user='RHR0856', count=16.0, anomalyScore=100,
startTimestamp=1511330400000, endTimestamp=1511330403600}, 1511333999999, 0),
[SharedBufferEdge(null, 1)], 1), 1.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logoff',
field='activity', user='RHR0856', count=12.0, anomalyScore=100,
startTimestamp=1511344800000, endTimestamp=1511344803600}, 1511369999999, 0),
[SharedBufferEdge(null, 18)], 1), 18.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logon',
field='activity', user='RHR0856', count=16.0, anomalyScore=100,
startTimestamp=1511330400000, endTimestamp=1511330403600}, 1511369999999, 0),
[SharedBufferEdge(null, 17)], 1), 17.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logon',
field='activity', user='RHR0856', count=22.0, anomalyScore=100,
startTimestamp=1511330400000, endTimestamp=1511330403600}, 1511391599999, 0),
[SharedBufferEdge(null, 29)], 1), 29.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logoff',
field='activity', user='RHR0856', count=18.0, anomalyScore=100,
startTimestamp=1511344800000, endTimestamp=1511344803600}, 1511405999999, 0),
[SharedBufferEdge(null, 38)], 1), 38.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logoff',
field='activity', user='RHR0856', count=12.0, anomalyScore=100,
startTimestamp=1511344800000, endTimestamp=1511344803600}, 1511384399999, 0),
[SharedBufferEdge(null, 26)], 1), 26.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logoff',
field='activity', user='RHR0856', count=18.0, anomalyScore=100,
startTimestamp=1511344800000, endTimestamp=1511344803600}, 1511398799999, 0),
[SharedBufferEdge(null, 34)], 1), 34.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logon',
field='activity', user='RHR0856', count=16.0, anomalyScore=100,
startTimestamp=1511330400000, endTimestamp=1511330403600}, 1511384399999, 0),
[SharedBufferEdge(null, 25)], 1), 25.0),
SharedBufferEdge(SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
detector='SlidingWindowAnomalyDetector', measure='count', dimension='Logon',
field='activity', user='RHR0856', count=22.0, anomalyScore=100,
startTimestamp=1511330400000, endTimestamp=1511330403600}, 1511405999999, 0),
[SharedBufferEdge(null, 37)], 1), 37.0)], 10)=2
{noformat}
> RocksDB state backend Checkpointing is not working with KeyedCEP in 1.4
> -----------------------------------------------------------------------
>
> Key: FLINK-8248
> URL: https://issues.apache.org/jira/browse/FLINK-8248
> Project: Flink
> Issue Type: Bug
> Components: CEP, State Backends, Checkpointing
> Affects Versions: 1.4.0, 1.3.2
> Environment: linux: 3.10.0-514.el7.x86_64
> flink:
> * version: 1.4
> * rocksdb backend state
> * checkpoint interval 5s
> * keyed cep
> language: Java8
> Reporter: jia liu
>
> Here is my exception log:
> {code:java}
> java.lang.RuntimeException: Exception occurred while processing valve output
> watermark:
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:291)
> at
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
> at
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:189)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Error while adding data to RocksDB
> at
> org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:103)
> at
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.updateNFA(AbstractKeyedCEPPatternOperator.java:309)
> at
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:247)
> at
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:277)
> at
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:886)
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:288)
> ... 7 more
> Caused by: java.lang.IllegalStateException: Could not find id for entry:
> SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon',
> detector='SlidingWindowAnomalyDetector', measure='count', field='activity',
> dimension='Logoff', description='null', icons=null,
> startTimestamp=1465297200000, endTimestamp=1465297203600, count=11.0,
> anomalyScore=100, adHashCode=-1866791453, timeMap={1465297200000=11.0},
> user='LMR0049', logQuery=null, group='null'}, 1465300799999, 0),
> [SharedBufferEdge(null, 199)], 1)
> at
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
> at
> org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:943)
> at
> org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:806)
> at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:888)
> at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:820)
> at
> org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:100)
> ... 13 more
> {code}
> Main job code:
> {code:java}
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
> env.setStateBackend(new
> RocksDBStateBackend(getString("flink.backend-state-dir")));
> // .........
> DataStream<Behavior> behaviorStream = anomalyStream
> .assignTimestampsAndWatermarks(new
> AnomalyTimestampExtractor(Time.seconds(0)))
> .keyBy((KeySelector<AnomalySlice, String>) value ->
> value.entity)
>
> .window(SlidingEventTimeWindows.of(Time.seconds(getLong("flink.window.window-size")),
> Time.seconds(getLong("flink.window.slice-size"))))
> .apply(new BehaviorBuilderFunction())
> .filter(new WhitelistFilterFunction())
> // non-keyed stream will result in pattern operator
> parallelism equal to 1.
> .keyBy((KeySelector<Behavior, String>) Behavior::getUser);
> // cep on behavior stream
> List<Pattern> allPatterns = PatternsHolder.getAllPatterns();
> for (Pattern pa : allPatterns) {
> PatternStream<Behavior> ps = CEP.pattern(behaviorStream, pa);
> ps.select(new AlertGenerator(pa.getName())).name(pa.getName());
> }
> {code}
> keyed stream event:
> {code:java}
> public class Behavior implements Serializable {
> private static final long serialVersionUID = 7786674623147772721L;
> static int ANOMALY_SCORE_THRESHOLD = 40;
> static int ANOMALY_COUNT_THRESHOLD = 3;
> public final String schema;
> public final String detector;
> private String measure = UEBAConstants.DEFAULT_MEASURE_FIELD;
> public final String dimension;
> public final String field; //dim value
> private String user;
> public String group;
> public double count;
> public int anomalyScore;
> protected String description;
> private Icon[] icons;
> private int adHashCode;
> private long startTimestamp;
> private long endTimestamp;
> private Map<Long, Double> timeMap;
> public ArrayList<HashMap<String, Object>> logQuery;
> public Behavior(String schema, String detector, String field, String
> dimension, String user,
> long fromMillis, long toMillis, double count, int
> anomalyScore, ArrayList<HashMap<String,
> Object>> logQuery) {
> this.schema = schema;
> this.detector = detector;
> this.field = field;
> this.dimension = dimension;
> this.user = user;
> this.startTimestamp = fromMillis;
> this.endTimestamp = toMillis;
> this.count = count;
> this.anomalyScore = anomalyScore;
> this.logQuery = logQuery;
> timeMap = new HashMap<>();
> timeMap.put(fromMillis, count);
> }
> public Behavior(String schema, String detector, String field, String
> dimension,
> long fromMillis, long toMillis, double count, int
> anomalyScore) {
> this.schema = schema;
> this.detector = detector;
> this.field = field;
> this.dimension = dimension;
> this.startTimestamp = fromMillis;
> this.endTimestamp = toMillis;
> this.count = count;
> this.anomalyScore = anomalyScore;
> timeMap = new HashMap<>();
> timeMap.put(fromMillis, count);
> }
> public String getGroup() {
> return group;
> }
> public void setGroup(String group) {
> this.group = group;
> }
> public void setAdHashCode(int hashCode) {
> this.adHashCode = hashCode;
> }
> public void setMeasure(String measure) {
> this.measure = measure;
> }
> public String getMeasure() {
> return measure;
> }
> // anomalyScore is using weighted average, may not be wise.
> public void add(long fromMillis, long toMillis, double count, int
> anomalyScore, ArrayList<HashMap<String,
> Object>> logQuery) {
> double sum = this.count * this.anomalyScore + count * anomalyScore;
> this.count += count;
> this.anomalyScore = (int) (sum / this.count);
> if (fromMillis < this.startTimestamp) {
> this.startTimestamp = fromMillis;
> }
> if (toMillis > this.endTimestamp) {
> this.endTimestamp = toMillis;
> }
> if (!timeMap.containsKey(fromMillis)) {
> timeMap.put(fromMillis, 0.0);
> }
> timeMap.put(fromMillis, timeMap.get(fromMillis) + count);
> if (logQuery != null) {
> this.logQuery.addAll(logQuery);
> }
> }
> public void add(long fromMillis, long toMillis, double count, int
> anomalyScore) {
> double sum = this.count * this.anomalyScore + count * anomalyScore;
> this.count += count;
> this.anomalyScore = (int) (sum / this.count);
> if (fromMillis < this.startTimestamp) {
> this.startTimestamp = fromMillis;
> }
> if (toMillis > this.endTimestamp) {
> this.endTimestamp = toMillis;
> }
> if (!timeMap.containsKey(fromMillis)) {
> timeMap.put(fromMillis, 0.0);
> }
> timeMap.put(fromMillis, timeMap.get(fromMillis) + count);
> }
> public Long[] getTimestamps() {
> return timeMap.keySet().toArray(new Long[timeMap.size()]);
> }
> public String dimension() {
> return dimension;
> }
> public long startTimestamp() {
> return startTimestamp;
> }
> public long endTimestamp() {
> return endTimestamp;
> }
> public double count() {
> return count;
> }
> public int anomalyScore() {
> return anomalyScore;
> }
> public boolean isAnomaly() {
> return anomalyScore() >= ANOMALY_SCORE_THRESHOLD && count() >=
> ANOMALY_COUNT_THRESHOLD;
> }
> public String getUser() {
> return user;
> }
> public void setUser(String user) {
> this.user = user;
> }
> public void describeAs(String description, Icon... icons) {
> this.description = description;
> this.icons = icons;
> }
> public String setVisualizeInterfaceParameter(String group, long
> visualizeStartTimestamp, long
> visualizeEndTimestamp) {
> String requestParameterString = "/get_alert_visualize?detectorName="
> + detector + "&groupField=" + group +
> "&user=" + user + "&field=" + field + "&measureField=" +
> measure + "&schemaName=" + schema +
> "&dimensionField=" + dimension + "&visualizeStartTimestamp="
> + visualizeStartTimestamp +
> "&visualizeEndTimestamp=" + visualizeEndTimestamp;
> return requestParameterString;
> }
> @Override
> public int hashCode() {
> int result;
> long temp;
> result = schema != null ? schema.hashCode() : 0;
> result = 31 * result + (detector != null ? detector.hashCode() : 0);
> result = 31 * result + (measure != null ? measure.hashCode() : 0);
> result = 31 * result + (field != null ? field.hashCode() : 0);
> result = 31 * result + (dimension != null ? dimension.hashCode() : 0);
> result = 31 * result + (description != null ? description.hashCode()
> : 0);
> result = 31 * result + Arrays.hashCode(icons);
> result = 31 * result + (int) (startTimestamp ^ (startTimestamp >>>
> 32));
> result = 31 * result + (int) (endTimestamp ^ (endTimestamp >>> 32));
> temp = Double.doubleToLongBits(count);
> result = 31 * result + (int) (temp ^ (temp >>> 32));
> result = 31 * result + anomalyScore;
> result = 31 * result + adHashCode;
> result = 31 * result + (timeMap != null ? timeMap.hashCode() : 0);
> result = 31 * result + (user != null ? user.hashCode() : 0);
> result = 31 * result + (logQuery != null ? logQuery.hashCode() : 0);
> result = 31 * result + (group != null ? group.hashCode() : 0);
> return result;
> }
> @Override
> public boolean equals(Object o) {
> if (this == o) return true;
> if (o == null || getClass() != o.getClass()) return false;
> Behavior behavior = (Behavior) o;
> if (startTimestamp != behavior.startTimestamp) return false;
> if (endTimestamp != behavior.endTimestamp) return false;
> if (Double.compare(behavior.count, count) != 0) return false;
> if (anomalyScore != behavior.anomalyScore) return false;
> if (adHashCode != behavior.adHashCode) return false;
> if (schema != null ? !schema.equals(behavior.schema) :
> behavior.schema != null)
> return false;
> if (detector != null ? !detector.equals(behavior.detector) :
> behavior.detector != null)
> return false;
> if (measure != null ? !measure.equals(behavior.measure) :
> behavior.measure != null)
> return false;
> if (field != null ? !field.equals(behavior.field) : behavior.field !=
> null) return false;
> if (dimension != null ? !dimension.equals(behavior.dimension) :
> behavior.dimension != null)
> return false;
> if (description != null ? !description.equals(behavior.description) :
> behavior.description != null)
> return false;
> // Probably incorrect - comparing Object[] arrays with Arrays.equals
> if (!Arrays.equals(icons, behavior.icons)) return false;
> if (timeMap != null ? !timeMap.equals(behavior.timeMap) :
> behavior.timeMap != null)
> return false;
> if (user != null ? !user.equals(behavior.user) : behavior.user !=
> null) return false;
> if (logQuery != null ? !logQuery.equals(behavior.logQuery) :
> behavior.logQuery != null)
> return false;
> return group != null ? group.equals(behavior.group) : behavior.group
> == null;
> }
> @Override
> public String toString() {
> return "Behavior{" +
> "schema='" + schema + '\'' +
> ", detector='" + detector + '\'' +
> ", measure='" + measure + '\'' +
> ", field='" + field + '\'' +
> ", dimension='" + dimension + '\'' +
> ", description='" + description + '\'' +
> ", icons=" + Arrays.toString(icons) +
> ", startTimestamp=" + startTimestamp +
> ", endTimestamp=" + endTimestamp +
> ", count=" + count +
> ", anomalyScore=" + anomalyScore +
> ", adHashCode=" + adHashCode +
> ", timeMap=" + timeMap +
> ", user='" + user + '\'' +
> ", logQuery=" + logQuery +
> ", group='" + group + '\'' +
> '}';
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)