Repository: flink Updated Branches: refs/heads/master 39ec54ff1 -> a750415b6
[FLINK-2069] Fix Scala CSV Output Format Closes #759 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a750415b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a750415b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a750415b Branch: refs/heads/master Commit: a750415b60a8f5bb150378fc49b5985c4ccb8c57 Parents: 39ec54f Author: Aljoscha Krettek <[email protected]> Authored: Tue Jun 2 16:37:06 2015 +0200 Committer: mbalassi <[email protected]> Committed: Wed Jun 3 17:05:01 2015 +0200 ---------------------------------------------------------------------- .../streaming/api/datastream/DataStream.java | 11 +++++- .../api/functions/sink/FileSinkFunction.java | 1 + .../flink/streaming/api/scala/DataStream.scala | 35 ++++++++++++++++---- 3 files changed, 40 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a750415b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index 1ec440d..e0a8a7a 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -1230,7 +1230,16 @@ public class DataStream<OUT> { return returnStream; } - private DataStreamSink<OUT> writeToFile(OutputFormat<OUT> format, long millis) { + /** + * Writes a DataStream using the given {@link OutputFormat}. The + * writing is performed periodically, in every millis milliseconds. + * + * @param format The output format that should be used for writing. + * @param millis the file update frequency + * + * @return the closed DataStream + */ + public DataStreamSink<OUT> writeToFile(OutputFormat<OUT> format, long millis) { DataStreamSink<OUT> returnStream = addSink(new FileSinkFunctionByMillis<OUT>(format, millis)); return returnStream; } http://git-wip-us.apache.org/repos/asf/flink/blob/a750415b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java index 43ee2a7..1cf5c07 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java @@ -105,6 +105,7 @@ public abstract class FileSinkFunction<IN> extends RichSinkFunction<IN> { } catch (Throwable t) { LOG.error("Cleanup on error failed.", t); } + throw new RuntimeException(ex); } resetParameters(); } http://git-wip-us.apache.org/repos/asf/flink/blob/a750415b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala index f32d0cc..3b2183b 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala @@ -18,6 +18,10 @@ package org.apache.flink.streaming.api.scala +import org.apache.flink.api.common.io.OutputFormat +import org.apache.flink.api.scala.operators.ScalaCsvOutputFormat +import org.apache.flink.core.fs.{FileSystem, Path} + import scala.collection.JavaConverters._ import scala.reflect.ClassTag @@ -27,10 +31,10 @@ import org.apache.flink.api.java.functions.KeySelector import org.apache.flink.api.java.typeutils.TupleTypeInfoBase import org.apache.flink.api.streaming.scala.ScalaStreamingAggregator import org.apache.flink.streaming.api.collector.selector.OutputSelector -import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, GroupedDataStream, SingleOutputStreamOperator} +import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, DataStreamSink, GroupedDataStream, SingleOutputStreamOperator} import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType import org.apache.flink.streaming.api.functions.aggregation.SumFunction -import org.apache.flink.streaming.api.functions.sink.SinkFunction +import org.apache.flink.streaming.api.functions.sink.{FileSinkFunctionByMillis, SinkFunction} import org.apache.flink.streaming.api.operators.{StreamGroupedReduce, StreamReduce} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean import org.apache.flink.streaming.api.windowing.helper.WindowingHelper @@ -728,8 +732,27 @@ class DataStream[T](javaStream: JavaStream[T]) { * is written. * */ - def writeAsCsv(path: String, millis: Long = 0): DataStream[T] = - javaStream.writeAsCsv(path, millis) + def writeAsCsv( + path: String, + millis: Long = 0, + rowDelimiter: String = ScalaCsvOutputFormat.DEFAULT_LINE_DELIMITER, + fieldDelimiter: String = ScalaCsvOutputFormat.DEFAULT_FIELD_DELIMITER, + writeMode: FileSystem.WriteMode = null): DataStream[T] = { + require(javaStream.getType.isTupleType, "CSV output can only be used with Tuple DataSets.") + val of = new ScalaCsvOutputFormat[Product](new Path(path), rowDelimiter, fieldDelimiter) + if (writeMode != null) { + of.setWriteMode(writeMode) + } + javaStream.writeToFile(of.asInstanceOf[OutputFormat[T]], millis) + } + + /** + * Writes a DataStream using the given [[OutputFormat]]. The + * writing is performed periodically, in every millis milliseconds. + */ + def writeToFile(format: OutputFormat[T], millis: Long): DataStreamSink[T] = { + javaStream.writeToFile(format, millis) + } /** * Writes the DataStream to a socket as a byte array. The format of the output is @@ -744,8 +767,8 @@ class DataStream[T](javaStream: JavaStream[T]) { * method is called. * */ - def addSink(sinkFuntion: SinkFunction[T]): DataStream[T] = - javaStream.addSink(sinkFuntion) + def addSink(sinkFunction: SinkFunction[T]): DataStream[T] = + javaStream.addSink(sinkFunction) /** * Adds the given sink to this DataStream. Only streams with sinks added
