pnowojski commented on a change in pull request #10084: [FLINK-14382][yarn]
Incorrect handling of FLINK_PLUGINS_DIR on Yarn
URL: https://github.com/apache/flink/pull/10084#discussion_r344688121
##########
File path:
flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java
##########
@@ -99,7 +118,28 @@ public static void main(String[] args) throws Exception {
* FlatMapFunction. The function takes a line (String) and splits it
into
* multiple pairs in the form of "(word,1)" ({@code Tuple2<String,
Integer>}).
*/
- public static final class Tokenizer implements FlatMapFunction<String,
Tuple2<String, Integer>> {
+ public static final class Tokenizer extends RichFlatMapFunction<String,
Tuple2<String, Integer>> {
+
+ private final Collection<String> inputFiles;
+
+ public Tokenizer() {
+ this(null);
+ }
+
+ public Tokenizer(@Nullable Collection<String> inputFiles) {
+ this.inputFiles = inputFiles;
+ }
+
+ @Override
+ public void open(Configuration conf) throws IOException {
+ if (inputFiles != null && inputFiles.size() > 0) {
+ List<Object> classCollection = new
ArrayList<>(inputFiles.size());
+ for (String input : inputFiles) {
+
classCollection.add(FileSystem.getUnguardedFileSystem(URI.create(input)));
+ }
+
PluginUtils.checkClassIsolationInDifferentPlugins(classCollection);
Review comment:
Those `checkClassIsolationInDifferentPlugins ` (both in batch and streaming
`WordCount`) checks shouldn't happen in the examples, but in the dummy file
system implementations. Otherwise they will be confusing people reading those
code examples.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services