Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/2202#discussion_r70450213
--- Diff:
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -454,4 +458,126 @@ static String generateStateName(final String name,
final int index) {
return name + "_" + index;
}
}
+
+ /**
+ * {@link TypeSerializer} for {@link NFA} that uses Java Serialization.
+ */
+ public static class Serializer<T> extends TypeSerializer<NFA<T>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean isImmutableType() {
+ return false;
+ }
+
+ @Override
+ public TypeSerializer<NFA<T>> duplicate() {
+ return this;
+ }
+
+ @Override
+ public NFA<T> createInstance() {
+ return null;
+ }
+
+ @Override
+ public NFA<T> copy(NFA<T> from) {
+ try {
+ ByteArrayOutputStream baos = new
ByteArrayOutputStream();
+ ObjectOutputStream oos = new
ObjectOutputStream(baos);
+
+ oos.writeObject(from);
+
+ oos.close();
+ baos.close();
+
+ byte[] data = baos.toByteArray();
+
+ ByteArrayInputStream bais = new
ByteArrayInputStream(data);
+ ObjectInputStream ois = new
ObjectInputStream(bais);
+
+ @SuppressWarnings("unchecked")
+ NFA<T> copy = (NFA<T>) ois.readObject();
+ return copy;
+ } catch (IOException|ClassNotFoundException e) {
+ throw new RuntimeException("Could not copy
NFA.", e);
+ }
+ }
+
+ @Override
+ public NFA<T> copy(NFA<T> from, NFA<T> reuse) {
+ return copy(from);
+ }
+
+ @Override
+ public int getLength() {
+ return 0;
+ }
+
+ @Override
+ public void serialize(NFA<T> record, DataOutputView target)
throws IOException {
+ try {
+ ByteArrayOutputStream baos = new
ByteArrayOutputStream();
+ ObjectOutputStream oos = new
ObjectOutputStream(baos);
+
+ oos.writeObject(record);
+
+ oos.close();
+ baos.close();
+
+ byte[] data = baos.toByteArray();
+
+ target.writeInt(data.length);
+ target.write(data);
+ } catch (IOException e) {
+ throw new RuntimeException("Could not serialize
NFA.", e);
+ }
+ }
+
+ @Override
+ public NFA<T> deserialize(DataInputView source) throws
IOException {
+ try {
+ int size = source.readInt();
+
+ byte[] data = new byte[size];
+
+ source.readFully(data);
+
+ ByteArrayInputStream bais = new
ByteArrayInputStream(data);
+ ObjectInputStream ois = new
ObjectInputStream(bais);
+
+ @SuppressWarnings("unchecked")
+ NFA<T> copy = (NFA<T>) ois.readObject();
+ return copy;
+ } catch (IOException|ClassNotFoundException e) {
+ throw new RuntimeException("Could not
deserialize NFA.", e);
+ } }
+
+ @Override
+ public NFA<T> deserialize(NFA<T> reuse, DataInputView source)
throws IOException {
+ return deserialize(source);
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target)
throws IOException {
+ int size = source.readInt();
+ target.writeInt(size);
+ target.write(source, size);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj instanceof Serializer;
--- End diff --
In order to make the equals relation symmetric, we should `return obj
instanceof Serializer && ((Serializer) obj).canEqual(this);`.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---