Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/1905#discussion_r60393928
--- Diff:
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
---
@@ -99,10 +116,26 @@
null,
false);
+ return flatSelect(patternFlatSelectFunction, outTypeInfo);
+ }
+
+ /**
+ * Applies a flat select function to the detected pattern sequence. For
each pattern sequence
+ * the provided {@link PatternFlatSelectFunction} is called. The
pattern flat select function
+ * can produce an arbitrary number of resulting elements.
+ *
+ * @param patternFlatSelectFunction The pattern flat select function
which is called for each
+ * detected pattern sequence.
+ * @param <R> Typ of the resulting elements
+ * @param outTypeInfo Explicit specification of output type.
+ * @return {@link DataStream} which contains the resulting elements
from the pattern flat select
+ * function.
+ */
+ public <R> DataStream<R> flatSelect(final PatternFlatSelectFunction<T,
R> patternFlatSelectFunction, TypeInformation<R> outTypeInfo) {
return patternStream.flatMap(
- new PatternFlatSelectMapper<T, R>(
-
patternStream.getExecutionEnvironment().clean(patternFlatSelectFunction)
- )).returns(outTypeInfo);
+ new PatternFlatSelectMapper<T, R>(
+
patternStream.getExecutionEnvironment().clean(patternFlatSelectFunction)
+ )).returns(outTypeInfo);
}
--- End diff --
Indentation off in this file.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---