Repository: flink
Updated Branches:
  refs/heads/master 39ec54ff1 -> a750415b6


[FLINK-2069] Fix Scala CSV Output Format

Closes #759


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a750415b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a750415b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a750415b

Branch: refs/heads/master
Commit: a750415b60a8f5bb150378fc49b5985c4ccb8c57
Parents: 39ec54f
Author: Aljoscha Krettek <[email protected]>
Authored: Tue Jun 2 16:37:06 2015 +0200
Committer: mbalassi <[email protected]>
Committed: Wed Jun 3 17:05:01 2015 +0200

----------------------------------------------------------------------
 .../streaming/api/datastream/DataStream.java    | 11 +++++-
 .../api/functions/sink/FileSinkFunction.java    |  1 +
 .../flink/streaming/api/scala/DataStream.scala  | 35 ++++++++++++++++----
 3 files changed, 40 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a750415b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 1ec440d..e0a8a7a 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -1230,7 +1230,16 @@ public class DataStream<OUT> {
                return returnStream;
        }
 
-       private DataStreamSink<OUT> writeToFile(OutputFormat<OUT> format, long 
millis) {
+       /**
+        * Writes a DataStream using the given {@link OutputFormat}. The
+        * writing is performed periodically, in every millis milliseconds.
+        *
+        * @param format The output format that should be used for writing.
+        * @param millis the file update frequency
+        *
+        * @return the closed DataStream
+        */
+       public DataStreamSink<OUT> writeToFile(OutputFormat<OUT> format, long 
millis) {
                DataStreamSink<OUT> returnStream = addSink(new 
FileSinkFunctionByMillis<OUT>(format, millis));
                return returnStream;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/a750415b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java
index 43ee2a7..1cf5c07 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java
@@ -105,6 +105,7 @@ public abstract class FileSinkFunction<IN> extends 
RichSinkFunction<IN> {
                        } catch (Throwable t) {
                                LOG.error("Cleanup on error failed.", t);
                        }
+                       throw new RuntimeException(ex);
                }
                resetParameters();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/a750415b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index f32d0cc..3b2183b 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -18,6 +18,10 @@
 
 package org.apache.flink.streaming.api.scala
 
+import org.apache.flink.api.common.io.OutputFormat
+import org.apache.flink.api.scala.operators.ScalaCsvOutputFormat
+import org.apache.flink.core.fs.{FileSystem, Path}
+
 import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 
@@ -27,10 +31,10 @@ import org.apache.flink.api.java.functions.KeySelector
 import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
 import org.apache.flink.api.streaming.scala.ScalaStreamingAggregator
 import org.apache.flink.streaming.api.collector.selector.OutputSelector
-import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, 
GroupedDataStream, SingleOutputStreamOperator}
+import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, 
DataStreamSink, GroupedDataStream, SingleOutputStreamOperator}
 import 
org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
 import org.apache.flink.streaming.api.functions.aggregation.SumFunction
-import org.apache.flink.streaming.api.functions.sink.SinkFunction
+import 
org.apache.flink.streaming.api.functions.sink.{FileSinkFunctionByMillis, 
SinkFunction}
 import org.apache.flink.streaming.api.operators.{StreamGroupedReduce, 
StreamReduce}
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
 import org.apache.flink.streaming.api.windowing.helper.WindowingHelper
@@ -728,8 +732,27 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * is written.
    *
    */
-  def writeAsCsv(path: String, millis: Long = 0): DataStream[T] =
-    javaStream.writeAsCsv(path, millis)
+  def writeAsCsv(
+      path: String,
+      millis: Long = 0,
+      rowDelimiter: String = ScalaCsvOutputFormat.DEFAULT_LINE_DELIMITER,
+      fieldDelimiter: String = ScalaCsvOutputFormat.DEFAULT_FIELD_DELIMITER,
+      writeMode: FileSystem.WriteMode = null): DataStream[T] = {
+    require(javaStream.getType.isTupleType, "CSV output can only be used with 
Tuple DataSets.")
+    val of = new ScalaCsvOutputFormat[Product](new Path(path), rowDelimiter, 
fieldDelimiter)
+    if (writeMode != null) {
+      of.setWriteMode(writeMode)
+    }
+    javaStream.writeToFile(of.asInstanceOf[OutputFormat[T]], millis)
+  }
+
+  /**
+   * Writes a DataStream using the given [[OutputFormat]]. The
+   * writing is performed periodically, in every millis milliseconds.
+   */
+  def writeToFile(format: OutputFormat[T], millis: Long): DataStreamSink[T] = {
+    javaStream.writeToFile(format, millis)
+  }
 
   /**
    * Writes the DataStream to a socket as a byte array. The format of the 
output is
@@ -744,8 +767,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * method is called.
    *
    */
-  def addSink(sinkFuntion: SinkFunction[T]): DataStream[T] =
-    javaStream.addSink(sinkFuntion)
+  def addSink(sinkFunction: SinkFunction[T]): DataStream[T] =
+    javaStream.addSink(sinkFunction)
 
   /**
    * Adds the given sink to this DataStream. Only streams with sinks added

Reply via email to