[
https://issues.apache.org/jira/browse/FLINK-4149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15372959#comment-15372959
]
ASF GitHub Bot commented on FLINK-4149:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/2202#discussion_r70449634
--- Diff:
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -454,4 +458,126 @@ static String generateStateName(final String name,
final int index) {
return name + "_" + index;
}
}
+
+ /**
+ * {@link TypeSerializer} for {@link NFA} that uses Java Serialization.
+ */
+ public static class Serializer<T> extends TypeSerializer<NFA<T>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean isImmutableType() {
+ return false;
+ }
+
+ @Override
+ public TypeSerializer<NFA<T>> duplicate() {
+ return this;
+ }
+
+ @Override
+ public NFA<T> createInstance() {
+ return null;
+ }
+
+ @Override
+ public NFA<T> copy(NFA<T> from) {
+ try {
+ ByteArrayOutputStream baos = new
ByteArrayOutputStream();
+ ObjectOutputStream oos = new
ObjectOutputStream(baos);
+
+ oos.writeObject(from);
+
+ oos.close();
+ baos.close();
+
+ byte[] data = baos.toByteArray();
+
+ ByteArrayInputStream bais = new
ByteArrayInputStream(data);
+ ObjectInputStream ois = new
ObjectInputStream(bais);
+
+ @SuppressWarnings("unchecked")
+ NFA<T> copy = (NFA<T>) ois.readObject();
+ return copy;
+ } catch (IOException|ClassNotFoundException e) {
+ throw new RuntimeException("Could not copy
NFA.", e);
+ }
+ }
+
+ @Override
+ public NFA<T> copy(NFA<T> from, NFA<T> reuse) {
+ return copy(from);
+ }
+
+ @Override
+ public int getLength() {
+ return 0;
+ }
+
+ @Override
+ public void serialize(NFA<T> record, DataOutputView target)
throws IOException {
+ try {
+ ByteArrayOutputStream baos = new
ByteArrayOutputStream();
+ ObjectOutputStream oos = new
ObjectOutputStream(baos);
+
+ oos.writeObject(record);
+
+ oos.close();
+ baos.close();
+
+ byte[] data = baos.toByteArray();
+
+ target.writeInt(data.length);
+ target.write(data);
--- End diff --
Can't we use the `DataOutputViewStream`?
```
ObjectOutputStream oos = new ObjectOutputStream(new
DataOutputViewStream(target));
oos.writeObject(record);
oos.close();
```
> Fix Serialization of NFA in AbstractKeyedCEPPatternOperator
> -----------------------------------------------------------
>
> Key: FLINK-4149
> URL: https://issues.apache.org/jira/browse/FLINK-4149
> Project: Flink
> Issue Type: Bug
> Components: CEP
> Affects Versions: 1.0.0, 1.1.0, 1.0.1, 1.0.2, 1.0.3
> Reporter: Aljoscha Krettek
> Assignee: Aljoscha Krettek
> Priority: Blocker
> Fix For: 1.1.0
>
>
> A job that uses CEP fails upon restore with a {{NullPointerException}} in
> {{NFA.process()}}. The reason seems to be that field {{computationStates}} is
> {{null}}. This field is transient and read in a custom {{readObject()}}
> method.
> In {{AbstractKeyedCEPPatternOperator}} this snipped is used to construct a
> {{StateDescriptor}} for an {{NFA}} state:
> {code}
> new ValueStateDescriptor<NFA<IN>>(
> NFA_OPERATOR_STATE_NAME,
> new KryoSerializer<NFA<IN>>((Class<NFA<IN>>) (Class<?>) NFA.class,
> getExecutionConfig()),
> null)
> {code}
> It seems Kryo does not invoke {{readObject}}/{{writeObject}}. We probably
> need a custom {{TypeSerializer}} for {{NFA}} to solve the problem.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)