Repository: flink
Updated Branches:
  refs/heads/master 844d5e2c4 -> 9a18e5790


[yarn] [python] Adds -Xms start option for yarn, fixes python test case to 
overwrite files, fixes python docs

Fixes failing yarn test cases


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9a18e579
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9a18e579
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9a18e579

Branch: refs/heads/master
Commit: 9a18e579021304f6ee0687cd1c9579740b11b98d
Parents: 844d5e2
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Tue Apr 28 22:30:20 2015 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Wed Apr 29 00:11:49 2015 +0200

----------------------------------------------------------------------
 docs/apis/python.md                                              | 2 +-
 .../flink/languagebinding/api/python/flink/test/test_csv.py      | 3 ++-
 .../main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java   | 4 ++--
 .../scala/org/apache/flink/yarn/ApplicationMasterActor.scala     | 3 ++-
 4 files changed, 7 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9a18e579/docs/apis/python.md
----------------------------------------------------------------------
diff --git a/docs/apis/python.md b/docs/apis/python.md
index 5bea544..14585fc 100644
--- a/docs/apis/python.md
+++ b/docs/apis/python.md
@@ -125,7 +125,7 @@ of these methods on DataSet:
 
 {% highlight python %}
 data.write_text("<file-path>", WriteMode=Constants.NO_OVERWRITE)
-write_csv("<file-path>", WriteMode=Constants.NO_OVERWRITE, 
line_delimiter='\n', field_delimiter=',')
+write_csv("<file-path>", line_delimiter='\n', field_delimiter=',', 
write_mode=Constants.NO_OVERWRITE)
 output()
 {% endhighlight %}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9a18e579/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_csv.py
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_csv.py
 
b/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_csv.py
index c48179b..e9fa822 100644
--- 
a/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_csv.py
+++ 
b/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_csv.py
@@ -17,13 +17,14 @@
 
################################################################################
 from flink.plan.Environment import get_environment
 from flink.plan.Constants import INT, STRING
+from flink.plan.Constants import WriteMode
 
 if __name__ == "__main__":
     env = get_environment()
 
     d1 = 
env.read_csv("src/test/python/org/apache/flink/languagebinding/api/python/flink/test/data_csv",
 (INT, INT, STRING))
 
-    d1.write_csv("/tmp/flink/result")
+    d1.write_csv("/tmp/flink/result", line_delimiter="\n", 
field_delimiter="|", write_mode=WriteMode.OVERWRITE)
 
     env.set_degree_of_parallelism(1)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9a18e579/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
 
b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index 2edab17..8c4dc4c 100644
--- 
a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ 
b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -514,8 +514,8 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
                        Assert.assertNotNull("Unable to locate JobManager log", 
jobmanagerLog);
                        content = FileUtils.readFileToString(jobmanagerLog);
                        // expecting 512 mb, because TM was started with 1024, 
we cut off 50% (NOT THE DEFAULT VALUE).
-                       Assert.assertTrue("Expected string 'Starting TM with 
command=$JAVA_HOME/bin/java -Xmx512m' not found in JobManager log: 
'"+jobmanagerLog+"'",
-                                       content.contains("Starting TM with 
command=$JAVA_HOME/bin/java -Xmx512m"));
+                       Assert.assertTrue("Expected string 'Starting TM with 
command=$JAVA_HOME/bin/java -Xms512m -Xmx512m' not found in JobManager log: 
'"+jobmanagerLog+"'",
+                                       content.contains("Starting TM with 
command=$JAVA_HOME/bin/java -Xms512m -Xmx512m"));
                        Assert.assertTrue("Expected string ' (2/2) (attempt #0) 
to ' not found in JobManager log." +
                                                        "This string checks 
that the job has been started with a parallelism of 2. Log contents: 
'"+jobmanagerLog+"'",
                                        content.contains(" (2/2) (attempt #0) 
to "));

http://git-wip-us.apache.org/repos/asf/flink/blob/9a18e579/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
index 4285da7..f384130 100644
--- 
a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
+++ 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
@@ -506,7 +506,8 @@ trait ApplicationMasterActor extends ActorLogMessages {
     val ctx = Records.newRecord(classOf[ContainerLaunchContext])
 
     val javaOpts = 
flinkConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "")
-    val tmCommand = new StringBuilder(s"$$JAVA_HOME/bin/java -Xmx${heapLimit}m 
$javaOpts")
+    val tmCommand = new StringBuilder(s"$$JAVA_HOME/bin/java -Xms${heapLimit}m 
" +
+      s"-Xmx${heapLimit}m $javaOpts")
 
     if (hasLogback || hasLog4j) {
       tmCommand ++=

Reply via email to