[
https://issues.apache.org/jira/browse/FLINK-7943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16234836#comment-16234836
]
ASF GitHub Bot commented on FLINK-7943:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4921#discussion_r148398599
--- Diff:
flink-java/src/main/java/org/apache/flink/api/java/utils/RequiredParameters.java
---
@@ -83,82 +82,100 @@ public void add(Option option) throws
RequiredParametersException {
* <p>If any check fails, a RequiredParametersException is thrown
*
* @param parameterTool - parameters supplied by the user.
+ * @return New ParameterTool instance with default values set
* @throws RequiredParametersException if any of the specified checks
fail
*/
- public void applyTo(ParameterTool parameterTool) throws
RequiredParametersException {
+ public ParameterTool applyTo(ParameterTool parameterTool) throws
RequiredParametersException {
List<String> missingArguments = new LinkedList<>();
+
+ ParameterTool resultParameterTool = parameterTool;
for (Option o : data.values()) {
- if (parameterTool.data.containsKey(o.getName())) {
- if
(Objects.equals(parameterTool.data.get(o.getName()),
ParameterTool.NO_VALUE_KEY)) {
+ if (resultParameterTool.has(o.getName())) {
+ if
(Objects.equals(resultParameterTool.get(o.getName()),
ParameterTool.NO_VALUE_KEY)) {
// the parameter has been passed, but
no value, check if there is a default value
- checkAndApplyDefaultValue(o,
parameterTool.data);
+ resultParameterTool =
checkAndApplyDefaultValue(o, resultParameterTool);
} else {
// a value has been passed in the
parameterTool, now check if it adheres to all constraints
- checkAmbiguousValues(o,
parameterTool.data);
- checkIsCastableToDefinedType(o,
parameterTool.data);
- checkChoices(o, parameterTool.data);
+ checkAmbiguousValues(o,
resultParameterTool);
+ checkIsCastableToDefinedType(o,
resultParameterTool);
+ checkChoices(o, resultParameterTool);
}
} else {
// check if there is a default name or a value
passed for a possibly defined alternative name.
- if
(hasNoDefaultValueAndNoValuePassedOnAlternativeName(o, parameterTool.data)) {
+ resultParameterTool =
synchronizeAlternativeName(o, resultParameterTool);
+
+ if (!resultParameterTool.has(o.getName()) ||
Objects.equals(resultParameterTool.get(o.getName()),
ParameterTool.NO_VALUE_KEY)) {
missingArguments.add(o.getName());
}
}
}
if (!missingArguments.isEmpty()) {
throw new
RequiredParametersException(this.missingArgumentsText(missingArguments),
missingArguments);
}
+
+ return resultParameterTool;
}
// check if the given parameter has a default value and add it to the
passed map if that is the case
// else throw an exception
- private void checkAndApplyDefaultValue(Option o, Map<String, String>
data) throws RequiredParametersException {
- if (hasNoDefaultValueAndNoValuePassedOnAlternativeName(o,
data)) {
+ private ParameterTool checkAndApplyDefaultValue(Option o, ParameterTool
parameterTool) throws RequiredParametersException {
+ final ParameterTool resultParameterTool =
synchronizeAlternativeName(o, parameterTool);
+
+ if (!resultParameterTool.has(o.getName()) ||
Objects.equals(resultParameterTool.get(o.getName()),
ParameterTool.NO_VALUE_KEY)) {
throw new RequiredParametersException("No default value
for undefined parameter " + o.getName());
+ } else {
+ return resultParameterTool;
}
}
// check if the value in the given map which corresponds to the name of
the given option
// is castable to the type of the option (if any is defined)
- private void checkIsCastableToDefinedType(Option o, Map<String, String>
data) throws RequiredParametersException {
- if (o.hasType() &&
!o.isCastableToDefinedType(data.get(o.getName()))) {
+ private void checkIsCastableToDefinedType(Option o, ParameterTool
parameterTool) throws RequiredParametersException {
+ if (o.hasType() &&
!o.isCastableToDefinedType(parameterTool.get(o.getName()))) {
throw new RequiredParametersException("Value for
parameter " + o.getName() +
" cannot be cast to type " +
o.getType());
}
}
// check if the value in the given map which corresponds to the name of
the given option
// adheres to the list of given choices for the param in the options
(if any are defined)
- private void checkChoices(Option o, Map<String, String> data) throws
RequiredParametersException {
- if (o.getChoices().size() > 0 &&
!o.getChoices().contains(data.get(o.getName()))) {
- throw new RequiredParametersException("Value " +
data.get(o.getName()) +
+ private void checkChoices(Option o, ParameterTool parameterTool) throws
RequiredParametersException {
+ if (o.getChoices().size() > 0 &&
!o.getChoices().contains(parameterTool.get(o.getName()))) {
+ throw new RequiredParametersException("Value " +
parameterTool.get(o.getName()) +
" is not in the list of valid choices
for key " + o.getName());
}
}
// move value passed on alternative name to standard name or apply
default value if any defined
- // else return true to indicate parameter is 'really' missing
- private boolean
hasNoDefaultValueAndNoValuePassedOnAlternativeName(Option o, Map<String,
String> data)
- throws RequiredParametersException {
- if (o.hasAlt() && data.containsKey(o.getAlt())) {
- data.put(o.getName(), data.get(o.getAlt()));
+ // if any change was applied, then this method returns a new
ParameterTool instance with these
+ // changes. If not, then the passed ParameterTool instance will be
returned.
+ private ParameterTool synchronizeAlternativeName(Option o,
ParameterTool parameterTool) {
+ // TODO: Throw this all away!!!
+ if (o.hasAlt() && parameterTool.has(o.getAlt())) {
+ HashMap<String, String> newData = new
HashMap<>(parameterTool.toMap());
+ newData.put(o.getName(), parameterTool.get(o.getAlt()));
+
+ return ParameterTool.fromMap(newData);
} else {
if (o.hasDefaultValue()) {
- data.put(o.getName(), o.getDefaultValue());
+ HashMap<String, String> newData = new
HashMap<>(parameterTool.toMap());
--- End diff --
Yes we could. But I didn't want to touch more than necessary. I think that
this actually dead code and should be removed.
> OptionalDataException when launching Flink jobs concurrently
> ------------------------------------------------------------
>
> Key: FLINK-7943
> URL: https://issues.apache.org/jira/browse/FLINK-7943
> Project: Flink
> Issue Type: Bug
> Components: Client
> Affects Versions: 1.4.0
> Reporter: Till Rohrmann
> Assignee: Till Rohrmann
> Priority: Major
>
> A user reported that he is getting a {{OptionalDataException}} when he
> launches multiple Flink jobs from the same program concurrently. The problem
> seems to appear if one sets the {{GlobalJobParameters}}. The stack trace can
> be found below:
> {code}
> Failed to submit job 60f4da5cf76836fe52ceba5cebdae412 (Union4a:14:15)
> java.io.OptionalDataException
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1588)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428)
> at java.util.HashMap.readObject(HashMap.java:1407)
> at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2173)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)
> at
> org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:58)
> at
> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1283)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:495)
> at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> at
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
> at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> at
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
> at
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> at
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
> at
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
> at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}
> The user code causing the problem is:
> {code}
> @SuppressWarnings("serial")
> public class UnionThreaded {
> static int ThreadPoolSize = 3;
> static int JobsPerThread = 2;
> static ParameterTool params;
> public static class RunSubset implements Runnable {
> private int start = 0;
> private int end = 0;
> RunSubset(int start, int end) {
> this.start = start;
> this.end = end;
> }
> @Override
> public void run() {
> // set up the execution environment
> final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> // make parameters available in the web interface
> env.getConfig().setGlobalJobParameters(params);
> if (params.has("left") && params.has("right")) {
> for (int i = start; i < end; i++) {
> DataSet<DeviceRecord> l, r;
> DataSet<DeviceRecord1> j;
> DataSet<Tuple2<Integer, Integer>> c1, c2;
> r = env.readCsvFile(params.get("right") + "/" +
> Integer.toString(i))
> .pojoType(DeviceRecord.class, "A", "B", "C")
> .setParallelism(1)
> .filter(new MyFilter())
> .setParallelism(1);
> // read the text file from given input path
> j = env.readCsvFile(params.get("left") + "/" +
> Integer.toString(i))
> .pojoType(DeviceRecord.class, "A", "B", "C")
> .setParallelism(1)
> .leftOuterJoin(r)
> .where("B")
> .equalTo("B")
> .with(new MyFlatJoinFunction()).setParallelism(1);
> j.flatMap(new Mapper(false))
> .groupBy(0)
> .sum(1).setParallelism(1)
> .writeAsCsv(params.get("output") + "/" +
> Integer.toString(i), "\n", ",");
> j.flatMap(new Mapper2(true))
> .groupBy(0)
> .sum(1).setParallelism(1)
> .writeAsCsv(params.get("output2") + "/" +
> Integer.toString(i), "\n", ",");
> }
> }
> try {
> System.out.println("calling env.execute()"); // +
> Calendar.getInstance().getTime();
> env.execute("Union4a" + ":" + Integer.toString(start) + ":" +
> Integer.toString(end));
> } catch (Exception e) {
> System.err.println("env.execute exception: " +
> e.getMessage());
> }
> }
> }
> //
> *************************************************************************
> // PROGRAM
> //
> *************************************************************************
> public static void main(String[] args) throws Exception {
> params = ParameterTool.fromArgs(args);
> // set up the execution environment
> final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> // make parameters available in the web interface
> env.getConfig().setGlobalJobParameters(params);
> int total_to_do = Integer.decode(params.get("filecount"));
> // number of threads should be <= number of slots
> ThreadPoolExecutor executor = (ThreadPoolExecutor)
> Executors.newFixedThreadPool(ThreadPoolSize);
> // assumes an even number of jobs
> for (int i = 0; i < total_to_do; i += JobsPerThread) {
> int end = i + JobsPerThread;
> if (end > total_to_do) {
> end = total_to_do;
> }
> executor.execute(new RunSubset(i, end));
> }
> executor.shutdown();
> // Many ways of waiting.
> try {
> executor.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
> } catch (InterruptedException e) {
> System.out.println("Execution interrupted");
> System.exit(-1);
> }
> // get input data
> DataSet<Tuple2<Integer, Integer>> counts;
> DataSet<Tuple2<Integer, Integer>> counts2;
> counts = env.readCsvFile(params.get("output"))
> .types(Integer.class, Integer.class);
> counts2 = env.readCsvFile(params.get("output2"))
> .types(Integer.class, Integer.class);
> // Count by C
> counts = counts
> .groupBy(0)
> .sum(1);
> // Count by device
> counts2 = counts2
> .groupBy(0)
> .sum(1);
> // emit result
> if (params.has("output")) {
> counts.writeAsCsv(params.get("output3"), "\n", ", ");
> }
> // emit result
> if (params.has("output2")) {
> counts2.writeAsCsv(params.get("output4"), "\n", ", ");
> }
> // execute program
> env.execute("Union4b");
> }
> {code}
> [1]
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Execute-multiple-jobs-in-parallel-threading-java-io-OptionalDataException-td16441.html
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)