[
https://issues.apache.org/jira/browse/FLINK-3203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15083355#comment-15083355
]
Omar Alvarez edited comment on FLINK-3203 at 1/5/16 5:23 PM:
-------------------------------------------------------------
Thanks for the quick answer.
I have tested the PageRank, WordCount and ConnectedComponents examples, as well
as implement manually the WordCount example exactly as I have done in Python.
The same error is not popping up with all the Java programs that I have tried,
but I am not 100% sure if they are using the distributed cache.
I have as you asked tested a distributed cache program in Java and it also
fails. The program:
{code:title=DistributedCache.java|borderStyle=solid}
textPath = createTempFile("count.txt");
// set up the execution environment
final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
// get input data
env.registerCachedFile(textPath, "cache_test");
List<Tuple1<String>> result = env
.readTextFile(textPath)
.flatMap(new WordChecker())
.collect();
{code}
Output:
{noformat}
java.lang.Exception: The user defined 'open(Configuration)' method in class
org.myorg.quickstart.Job$WordChecker caused an exception: An error occurred
while copying the file.
at
org.apache.flink.runtime.operators.BatchTask.openUserCode(BatchTask.java:1368)
at
org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.openTask(ChainedFlatMapDriver.java:47)
at
org.apache.flink.runtime.operators.BatchTask.openChainedTasks(BatchTask.java:1408)
at
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:130)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: An error occurred while copying the file.
at
org.apache.flink.api.common.cache.DistributedCache.getFile(DistributedCache.java:78)
at org.myorg.quickstart.Job$WordChecker.open(Job.java:89)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at
org.apache.flink.runtime.operators.BatchTask.openUserCode(BatchTask.java:1366)
... 5 more
Caused by: java.io.FileNotFoundException: File
file:/tmp/flink_test_889697207182217374.tmp does not exist or the user running
Flink ('omar.alvarez') has insufficient permissions to access it.
at
org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:107)
at org.apache.flink.runtime.filecache.FileCache.copy(FileCache.java:242)
at
org.apache.flink.runtime.filecache.FileCache$CopyProcess.call(FileCache.java:322)
at
org.apache.flink.runtime.filecache.FileCache$CopyProcess.call(FileCache.java:306)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
{noformat}
was (Author: omaralvarez):
Thanks for the quick answer.
I am still a newbie, so if you can point me in the direction of a program that
uses the distributed cache, I would certainly try it. I have tested the
PageRank, WordCount and ConnectedComponents examples, as well as implement
manually the WordCount example exactly as I have done in Python. The same error
is not popping up with all the Java programs that I have tried, but I am not
100% sure if they are using the distributed cache.
> Python API crashing when run in OGS
> -----------------------------------
>
> Key: FLINK-3203
> URL: https://issues.apache.org/jira/browse/FLINK-3203
> Project: Flink
> Issue Type: Bug
> Components: Python API
> Affects Versions: 0.10.0
> Environment: Rocks 6.1 SP1, CentOS release 6.7
> (2.6.32-573.7.1.el6.x86_64), java/oraclejdk/1.8.0_45, Python 2.6.6
> Reporter: Omar Alvarez
>
> When trying to execute the Python example without HDFS, the FlatMap fails
> with the following error:
> {code:title=PyExample|borderStyle=solid}
> 01/05/2016 13:09:38 Job execution switched to status RUNNING.
> 01/05/2016 13:09:38 DataSource (ValueSource)(1/1) switched to SCHEDULED
> 01/05/2016 13:09:38 DataSource (ValueSource)(1/1) switched to DEPLOYING
> 01/05/2016 13:09:38 DataSource (ValueSource)(1/1) switched to RUNNING
> 01/05/2016 13:09:38 MapPartition (PythonFlatMap -> PythonCombine)(1/1)
> switched to SCHEDULED
> 01/05/2016 13:09:38 MapPartition (PythonFlatMap -> PythonCombine)(1/1)
> switched to DEPLOYING
> 01/05/2016 13:09:38 DataSource (ValueSource)(1/1) switched to FINISHED
> 01/05/2016 13:09:38 MapPartition (PythonFlatMap -> PythonCombine)(1/1)
> switched to RUNNING
> 01/05/2016 13:09:38 MapPartition (PythonFlatMap -> PythonCombine)(1/1)
> switched to FAILED
> java.lang.Exception: The user defined 'open()' method caused an exception: An
> error occurred while copying the file.
> at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:484)
> at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: An error occurred while copying the
> file.
> at
> org.apache.flink.api.common.cache.DistributedCache.getFile(DistributedCache.java:78)
> at
> org.apache.flink.languagebinding.api.java.python.streaming.PythonStreamer.startPython(PythonStreamer.java:68)
> at
> org.apache.flink.languagebinding.api.java.python.streaming.PythonStreamer.setupProcess(PythonStreamer.java:58)
> at
> org.apache.flink.languagebinding.api.java.common.streaming.Streamer.open(Streamer.java:67)
> at
> org.apache.flink.languagebinding.api.java.python.functions.PythonMapPartition.open(PythonMapPartition.java:47)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
> ... 3 more
> Caused by: java.io.FileNotFoundException: File file:/tmp/flink does not exist
> or the user running Flink ('omar.alvarez') has insufficient permissions to
> access it.
> at
> org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:107)
> at
> org.apache.flink.runtime.filecache.FileCache.copy(FileCache.java:242)
> at
> org.apache.flink.runtime.filecache.FileCache$CopyProcess.call(FileCache.java:322)
> at
> org.apache.flink.runtime.filecache.FileCache$CopyProcess.call(FileCache.java:306)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> ... 1 more
> {code}
> It is important to mention that I am using modified Flink cluster launch
> scripts to use the OGS engine. The modified scripts and usage case can be
> found in https://github.com/omaralvarez/flink-OGS-GE.
> The same example in the Java API works correctly, and the user has sufficient
> permissions to write the file. If use interactive nodes instead of the qsub
> command to run the example it does not fail.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)