Repository: flink
Updated Branches:
  refs/heads/master 7df6a3d72 -> 92947f0d8


[FLINK-1403] [streaming] Distributed filesystem support for streaming filesinks

Now streaming filesinks support the same filesystems as the batch ones


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dc0d81bb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dc0d81bb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dc0d81bb

Branch: refs/heads/master
Commit: dc0d81bb8bc056f9c8def5ce701c1aa121d2a13f
Parents: 7df6a3d
Author: szape <[email protected]>
Authored: Tue Jan 13 14:54:01 2015 +0100
Committer: mbalassi <[email protected]>
Committed: Wed Jan 21 16:05:25 2015 +0100

----------------------------------------------------------------------
 .../streaming/api/datastream/DataStream.java    | 163 +++++++++++++++----
 .../api/function/sink/FileSinkFunction.java     | 118 ++++++++++++++
 .../function/sink/FileSinkFunctionByMillis.java |  59 +++++++
 .../api/function/sink/RichSinkFunction.java     |   2 +-
 .../api/function/sink/SinkFunction.java         |   2 +-
 5 files changed, 306 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dc0d81bb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 7d1659f..db644e9 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.datastream;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.commons.lang3.Validate;
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.MapFunction;
@@ -28,15 +29,20 @@ import 
org.apache.flink.api.common.functions.RichFilterFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.io.CsvOutputFormat;
+import org.apache.flink.api.java.io.TextOutputFormat;
 import org.apache.flink.api.java.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.fs.FileSystem.WriteMode;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.JobGraphBuilder;
 import 
org.apache.flink.streaming.api.datastream.temporaloperator.StreamCrossOperator;
 import 
org.apache.flink.streaming.api.datastream.temporaloperator.StreamJoinOperator;
@@ -45,12 +51,9 @@ import 
org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
 import 
org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType;
 import 
org.apache.flink.streaming.api.function.aggregation.ComparableAggregator;
 import org.apache.flink.streaming.api.function.aggregation.SumAggregator;
+import org.apache.flink.streaming.api.function.sink.FileSinkFunctionByMillis;
 import org.apache.flink.streaming.api.function.sink.PrintSinkFunction;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
-import org.apache.flink.streaming.api.function.sink.WriteFormat;
-import org.apache.flink.streaming.api.function.sink.WriteFormatAsCsv;
-import org.apache.flink.streaming.api.function.sink.WriteFormatAsText;
-import org.apache.flink.streaming.api.function.sink.WriteSinkFunctionByMillis;
 import org.apache.flink.streaming.api.invokable.SinkInvokable;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
 import org.apache.flink.streaming.api.invokable.operator.CounterInvokable;
@@ -897,20 +900,35 @@ public class DataStream<OUT> {
        }
 
        /**
+        * Writes a DataStream to the file specified by path in text format. For
+        * every element of the DataStream the result of {@link 
Object#toString()}
+        * is written.
+        * 
+        * @param path
+        *            the path pointing to the location the text file is 
written to
+        * 
+        * @return the closed DataStream.
+        */
+       public DataStreamSink<OUT> writeAsText(String path) {
+               return writeToFile(new TextOutputFormat<OUT>(new Path(path)), 
0L);
+       }
+
+       /**
         * Writes a DataStream to the file specified by path in text format. The
         * writing is performed periodically, in every millis milliseconds. For
         * every element of the DataStream the result of {@link 
Object#toString()}
         * is written.
         * 
         * @param path
-        *            is the path to the location where the tuples are written
+        *            the path pointing to the location the text file is 
written to
         * @param millis
-        *            is the file update frequency
+        *            the file update frequency
         * 
-        * @return The closed DataStream
+        * @return the closed DataStream
         */
        public DataStreamSink<OUT> writeAsText(String path, long millis) {
-               return writeAsText(path, new WriteFormatAsText<OUT>(), millis);
+               TextOutputFormat<OUT> tof = new TextOutputFormat<OUT>(new 
Path(path));
+               return writeToFile(tof, millis);
        }
 
        /**
@@ -919,64 +937,137 @@ public class DataStream<OUT> {
         * is written.
         * 
         * @param path
-        *            is the path to the location where the tuples are written
+        *            the path pointing to the location the text file is 
written to
+        * @param writeMode
+        *            Control the behavior for existing files. Options are
+        *            NO_OVERWRITE and OVERWRITE.
         * 
-        * @return The closed DataStream
+        * @return the closed DataStream.
         */
-       public DataStreamSink<OUT> writeAsText(String path) {
-               return writeAsText(path, 0);
+       public DataStreamSink<OUT> writeAsText(String path, WriteMode 
writeMode) {
+               TextOutputFormat<OUT> tof = new TextOutputFormat<OUT>(new 
Path(path));
+               tof.setWriteMode(writeMode);
+               return writeToFile(tof, 0L);
        }
 
        /**
-        * Writes a DataStream to the file specified by path in text format. The
-        * writing is performed periodically, in every millis milliseconds. For
+        * Writes a DataStream to the file specified by path in text format. For
         * every element of the DataStream the result of {@link 
Object#toString()}
         * is written.
         * 
         * @param path
-        *            is the path to the location where the tuples are written
+        *            the path pointing to the location the text file is 
written to
+        * @param writeMode
+        *            Controls the behavior for existing files. Options are
+        *            NO_OVERWRITE and OVERWRITE.
         * @param millis
-        *            is the file update frequency
+        *            the file update frequency
         * 
-        * @return The closed DataStream
+        * @return the closed DataStream.
         */
-       public DataStreamSink<OUT> writeAsCsv(String path, long millis) {
-               if (!getType().isTupleType()) {
-                       throw new RuntimeException("Only tuple data streams can 
be written in csv format");
-               }
-               return writeAsText(path, new WriteFormatAsCsv<OUT>(), millis);
+       public DataStreamSink<OUT> writeAsText(String path, WriteMode 
writeMode, long millis) {
+               TextOutputFormat<OUT> tof = new TextOutputFormat<OUT>(new 
Path(path));
+               tof.setWriteMode(writeMode);
+               return writeToFile(tof, millis);
        }
 
        /**
-        * Writes a DataStream to the file specified by path in text format. For
+        * Writes a DataStream to the file specified by path in csv format. For
         * every element of the DataStream the result of {@link 
Object#toString()}
-        * is written.
+        * is written. This method can only be used on data streams of tuples.
         * 
         * @param path
-        *            is the path to the location where the tuples are written
+        *            the path pointing to the location the text file is 
written to
         * 
-        * @return The closed DataStream
+        * @return the closed DataStream
         */
-       public DataStreamSink<OUT> writeAsCsv(String path) {
-               return writeAsCsv(path, 0);
+       @SuppressWarnings("unchecked")
+       public <X extends Tuple> DataStreamSink<OUT> writeAsCsv(String path) {
+               Validate.isTrue(getType().isTupleType(),
+                               "The writeAsCsv() method can only be used on 
data sets of tuples.");
+               CsvOutputFormat<X> of = new CsvOutputFormat<X>(new Path(path),
+                               CsvOutputFormat.DEFAULT_LINE_DELIMITER, 
CsvOutputFormat.DEFAULT_FIELD_DELIMITER);
+               return writeToFile((OutputFormat<OUT>) of, 0L);
        }
 
        /**
-        * Writes a DataStream to the file specified by path in text format. The
+        * Writes a DataStream to the file specified by path in csv format. The
         * writing is performed periodically, in every millis milliseconds. For
         * every element of the DataStream the result of {@link 
Object#toString()}
-        * is written.
+        * is written. This method can only be used on data streams of tuples.
         * 
         * @param path
-        *            is the path to the location where the tuples are written
+        *            the path pointing to the location the text file is 
written to
         * @param millis
-        *            is the file update frequency
+        *            the file update frequency
         * 
-        * @return the data stream constructed
+        * @return the closed DataStream
         */
-       private DataStreamSink<OUT> writeAsText(String path, WriteFormat<OUT> 
format, long millis) {
-               DataStreamSink<OUT> returnStream = addSink(new 
WriteSinkFunctionByMillis<OUT>(path, format,
-                               millis));
+       @SuppressWarnings("unchecked")
+       public <X extends Tuple> DataStreamSink<OUT> writeAsCsv(String path, 
long millis) {
+               Validate.isTrue(getType().isTupleType(),
+                               "The writeAsCsv() method can only be used on 
data sets of tuples.");
+               CsvOutputFormat<X> of = new CsvOutputFormat<X>(new Path(path),
+                               CsvOutputFormat.DEFAULT_LINE_DELIMITER, 
CsvOutputFormat.DEFAULT_FIELD_DELIMITER);
+               return writeToFile((OutputFormat<OUT>) of, millis);
+       }
+
+       /**
+        * Writes a DataStream to the file specified by path in csv format. For
+        * every element of the DataStream the result of {@link 
Object#toString()}
+        * is written. This method can only be used on data streams of tuples.
+        * 
+        * @param path
+        *            the path pointing to the location the text file is 
written to
+        * @param writeMode
+        *            Controls the behavior for existing files. Options are
+        *            NO_OVERWRITE and OVERWRITE.
+        * 
+        * @return the closed DataStream
+        */
+       @SuppressWarnings("unchecked")
+       public <X extends Tuple> DataStreamSink<OUT> writeAsCsv(String path, 
WriteMode writeMode) {
+               Validate.isTrue(getType().isTupleType(),
+                               "The writeAsCsv() method can only be used on 
data sets of tuples.");
+               CsvOutputFormat<X> of = new CsvOutputFormat<X>(new Path(path),
+                               CsvOutputFormat.DEFAULT_LINE_DELIMITER, 
CsvOutputFormat.DEFAULT_FIELD_DELIMITER);
+               if (writeMode != null) {
+                       of.setWriteMode(writeMode);
+               }
+               return writeToFile((OutputFormat<OUT>) of, 0L);
+       }
+
+       /**
+        * Writes a DataStream to the file specified by path in csv format. The
+        * writing is performed periodically, in every millis milliseconds. For
+        * every element of the DataStream the result of {@link 
Object#toString()}
+        * is written. This method can only be used on data streams of tuples.
+        * 
+        * @param path
+        *            the path pointing to the location the text file is 
written to
+        * @param writeMode
+        *            Controls the behavior for existing files. Options are
+        *            NO_OVERWRITE and OVERWRITE.
+        * @param millis
+        *            the file update frequency
+        * 
+        * @return the closed DataStream
+        */
+       @SuppressWarnings("unchecked")
+       public <X extends Tuple> DataStreamSink<OUT> writeAsCsv(String path, 
WriteMode writeMode,
+                       long millis) {
+               Validate.isTrue(getType().isTupleType(),
+                               "The writeAsCsv() method can only be used on 
data sets of tuples.");
+               CsvOutputFormat<X> of = new CsvOutputFormat<X>(new Path(path),
+                               CsvOutputFormat.DEFAULT_LINE_DELIMITER, 
CsvOutputFormat.DEFAULT_FIELD_DELIMITER);
+               if (writeMode != null) {
+                       of.setWriteMode(writeMode);
+               }
+               return writeToFile((OutputFormat<OUT>) of, millis);
+       }
+
+       private DataStreamSink<OUT> writeToFile(OutputFormat<OUT> format, long 
millis) {
+               DataStreamSink<OUT> returnStream = addSink(new 
FileSinkFunctionByMillis<OUT>(format, millis));
                return returnStream;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/dc0d81bb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java
new file mode 100644
index 0000000..24beba1
--- /dev/null
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.sink;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.flink.api.common.io.CleanupWhenUnsuccessful;
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simple implementation of the SinkFunction writing tuples in the specified
+ * OutputFormat format. Tuples are collected to a list and written to the file
+ * periodically. The target path and the overwrite mode are pre-packaged in
+ * format.
+ * 
+ * @param <IN>
+ *            Input type
+ */
+public abstract class FileSinkFunction<IN> extends RichSinkFunction<IN> {
+       private static final long serialVersionUID = 1L;
+       private static final Logger LOG = 
LoggerFactory.getLogger(FileSinkFunction.class);
+       protected ArrayList<IN> tupleList = new ArrayList<IN>();
+       protected volatile OutputFormat<IN> format;
+       protected volatile boolean cleanupCalled = false;
+       protected int indexInSubtaskGroup;
+       protected int currentNumberOfSubtasks;
+
+       public FileSinkFunction(OutputFormat<IN> format) {
+               this.format = format;
+       }
+
+       @Override
+       public void open(Configuration parameters) throws Exception {
+               StreamingRuntimeContext context = (StreamingRuntimeContext) 
getRuntimeContext();
+               format.configure(context.getTaskStubParameters());
+               indexInSubtaskGroup = context.getIndexOfThisSubtask();
+               currentNumberOfSubtasks = context.getNumberOfParallelSubtasks();
+               format.open(indexInSubtaskGroup, currentNumberOfSubtasks);
+       }
+
+       @Override
+       public void invoke(IN record) throws Exception {
+               tupleList.add(record);
+               if (updateCondition()) {
+                       flush();
+               }
+       }
+
+       @Override
+       public void close() throws IOException {
+               if (!tupleList.isEmpty()) {
+                       flush();
+               }
+               try {
+                       format.close();
+               } catch (Exception ex) {
+                       try {
+                               if (!cleanupCalled && format instanceof 
CleanupWhenUnsuccessful) {
+                                       cleanupCalled = true;
+                                       ((CleanupWhenUnsuccessful) 
format).tryCleanupOnError();
+                               }
+                       } catch (Throwable t) {
+                               LOG.error("Cleanup on error failed.", t);
+                       }
+               }
+       }
+
+       protected void flush() {
+               try {
+                       for (IN rec : tupleList) {
+                               format.writeRecord(rec);
+                       }
+               } catch (Exception ex) {
+                       try {
+                               if (!cleanupCalled && format instanceof 
CleanupWhenUnsuccessful) {
+                                       cleanupCalled = true;
+                                       ((CleanupWhenUnsuccessful) 
format).tryCleanupOnError();
+                               }
+                       } catch (Throwable t) {
+                               LOG.error("Cleanup on error failed.", t);
+                       }
+               }
+               resetParameters();
+       }
+
+       /**
+        * Condition for writing the contents of tupleList and clearing it.
+        * 
+        * @return value of the updating condition
+        */
+       protected abstract boolean updateCondition();
+
+       /**
+        * Statements to be executed after writing a batch goes here.
+        */
+       protected abstract void resetParameters();
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dc0d81bb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunctionByMillis.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunctionByMillis.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunctionByMillis.java
new file mode 100644
index 0000000..f049a32
--- /dev/null
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunctionByMillis.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.sink;
+
+import org.apache.flink.api.common.io.OutputFormat;
+
+/**
+ * Implementation of FileSinkFunction. Writes tuples to file in every millis
+ * milliseconds.
+ * 
+ * @param <IN>
+ *            Input type
+ */
+public class FileSinkFunctionByMillis<IN> extends FileSinkFunction<IN> {
+       private static final long serialVersionUID = 1L;
+
+       private final long millis;
+       private long lastTime;
+
+       public FileSinkFunctionByMillis(OutputFormat<IN> format, long millis) {
+               super(format);
+               this.millis = millis;
+               lastTime = System.currentTimeMillis();
+       }
+
+       /**
+        * Condition for writing the contents of tupleList and clearing it.
+        * 
+        * @return value of the updating condition
+        */
+       @Override
+       protected boolean updateCondition() {
+               return System.currentTimeMillis() - lastTime >= millis;
+       }
+
+       /**
+        * Statements to be executed after writing a batch goes here.
+        */
+       @Override
+       protected void resetParameters() {
+               tupleList.clear();
+               lastTime = System.currentTimeMillis();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dc0d81bb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/RichSinkFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/RichSinkFunction.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/RichSinkFunction.java
index b89c169..3b8a4db 100755
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/RichSinkFunction.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/RichSinkFunction.java
@@ -23,6 +23,6 @@ public abstract class RichSinkFunction<IN> extends 
AbstractRichFunction implemen
 
        private static final long serialVersionUID = 1L;
 
-       public abstract void invoke(IN value);
+       public abstract void invoke(IN value) throws Exception;
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dc0d81bb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
index 9db268e..6097603 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
@@ -23,6 +23,6 @@ import org.apache.flink.api.common.functions.Function;
 
 public interface SinkFunction<IN> extends Function, Serializable {
 
-       public abstract void invoke(IN value);
+       public abstract void invoke(IN value) throws Exception;
 
 }

Reply via email to