[
https://issues.apache.org/jira/browse/FLINK-1525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14536566#comment-14536566
]
ASF GitHub Bot commented on FLINK-1525:
---------------------------------------
GitHub user rmetzger opened a pull request:
https://github.com/apache/flink/pull/664
[FLINK-1525][FEEDBACK] Introduction of a small input parameter parsing
utility
Hi,
last week I was running a bunch of Flink Streaming jobs on a cluster. One
of the jobs had 8 arguments which I changed in different iterations of the
program.
I ended up passing arguments like
```
16 1 8 3 10k
hdp22-w-1.c.internal:6667,hdp22-w-0.c.internal:6667,hdp22-m.c.internal:6667
10000
```
Its obvious that this is not easily maintainable.
In addition to this experience, I got similar feedback from at least two
other Flink users.
Therefore, I sat down and implemented a simple class which allows users to
work with input parameters in a hassle-free manner.
The tool is called **ParameterUtil**. It can be initialized from:
- regular command line arguments (`-` and `--`):
`ParameterUtil.fromArgs(new String[]{"--berlin"});`
- `.properties` files: `ParameterUtil.fromPropertiesFile(propertiesFile);`
- system properties (-D arguments to the JVM):
`ParameterUtil.fromSystemProperties()`;
I'm also planning to provide an initializer which accepts the same
arguments as Hadoop's GenericOptionsParser:
https://hadoop.apache.org/docs/r1.0.4/api/org/apache/hadoop/util/GenericOptionsParser.html
(our users are just too used to Hadoop's tooling)
For accessing arguments, it has methods like:
`parameter.getRequired("input")`, `parameter.get("output",
"myDefaultValue")`, `parameter.getLong("expectedCount", -1L)` and so on ...
Also, I added a method to export the parameters to Flink's `Configuration`
class:
```
Configuration config = parameter.getConfiguration();
config.getLong("expectedCount", -1L)
```
This allows users to pass the input arguments to operators in the APIs:
```
text.flatMap(new Tokenizer()).withParameters(conf)
```
The `ParameterUtil` itself is Serializable, so it can be passed into user
functions (for example to the `Tokenizer`).
Also, I extended the `ExecutionConfig` to allow passing a `UserConfig` with
custom stuff inside it.
The `ParameterUtil` is implementing the `UserConfig` interface, so users
can do the following:
```java
public static void main(String[] args) throws Exception {
ParameterUtil pt = ParameterUtil.fromArgs(args);
final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setUserConfig(pt);
///.... regular flink stuff ....
}
```
Inside a (rich) user function, users can access the command line arguments:
```java
text.flatMap(new Tokenizer()).flatMap(new
RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
@Override
public void flatMap(Tuple2<String, Integer> value,
Collector<Tuple2<String, Integer>> out) throws Exception {
ExecutionConfig.UserConfig uc =
getRuntimeContext().getExecutionConfig().getUserConfig();
ParameterUtil pt = (ParameterUtil) uc;
float norm = pt.getFloat("normalization", 0.15f);
}
})
```
The `UserConfig` allows to export Key/Value pairs to the web interface.
Running Wordcount:
```
/bin/flink run ./examples/flink-java-examples-0.9-SNAPSHOT-WordCount.jar
--input /home/robert/incubator-flink/build-target/README.txt --output /tmp/wo
```
Will lead to the following result:

Before I'm now going to add this to all examples I would like to get some
feedback for the API choices I made (I don't want to change all examples
afterwards ;) ).
Wordcount currently looks like this:
```java
public static void main(String[] args) throws Exception {
ParameterUtil pt = ParameterUtil.fromArgs(args);
boolean fileOutput = pt.getNumberOfParameters() == 2;
String textPath = null;
String outputPath = null;
if(fileOutput) {
textPath = pt.getRequired("input");
outputPath = pt.getRequired("output");
}
// set up the execution environment
final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setUserConfig(pt);
// create initial DataSet, containing the text lines.
DataSet<String> text;
if(fileOutput) {
text = env.readTextFile(textPath);
} else {
// get default test text data
text = WordCountData.getDefaultTextLineDataSet(env);
}
DataSet<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new Tokenizer())
// group by the tuple field "0" and sum up tuple field "1"
.groupBy(0)
.sum(1);
```
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/rmetzger/flink flink1525
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/664.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #664
----
commit 95a55eb7c0acabebf9ac0decfb669f3da0b514b1
Author: Robert Metzger <[email protected]>
Date: 2015-05-07T20:49:33Z
Initial draft
commit e801dcd5e6f43139f748d20468129126e409c4f4
Author: Robert Metzger <[email protected]>
Date: 2015-05-07T20:51:27Z
wip
commit 7c92b11c0c02faa25c645a5c74c00e83c73b7492
Author: Robert Metzger <[email protected]>
Date: 2015-05-08T09:58:29Z
wip
commit edf6aef4368d7589d87d515572334bbc9c4f7a99
Author: Robert Metzger <[email protected]>
Date: 2015-05-08T14:29:58Z
integrated into web frontend
commit 8bdc8f092bfa6de2e26c36d0dfb976a3e9d59c89
Author: Robert Metzger <[email protected]>
Date: 2015-05-08T16:33:34Z
wip
commit c339a5e98dd57fbb6753f9db10c15db3737e02c8
Author: Robert Metzger <[email protected]>
Date: 2015-05-09T11:41:20Z
travis, give me some feedback
commit dd1f5029fa0efb792bdab94f54bca3c61a9d0f32
Author: Robert Metzger <[email protected]>
Date: 2015-05-09T11:45:57Z
starting to rework the examples
commit 6f03b1ad054a6346996f3b148e09e0b3101588d7
Author: Robert Metzger <[email protected]>
Date: 2015-05-09T13:53:05Z
wip
----
> Provide utils to pass -D parameters to UDFs
> --------------------------------------------
>
> Key: FLINK-1525
> URL: https://issues.apache.org/jira/browse/FLINK-1525
> Project: Flink
> Issue Type: Improvement
> Components: flink-contrib
> Reporter: Robert Metzger
> Labels: starter
>
> Hadoop users are used to setting job configuration through "-D" on the
> command line.
> Right now, Flink users have to manually parse command line arguments and pass
> them to the methods.
> It would be nice to provide a standard args parser with is taking care of
> such stuff.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)