Repository: flink Updated Branches: refs/heads/master 7df6a3d72 -> 92947f0d8
[FLINK-1403] [streaming] Distributed filesystem support for streaming filesinks Now streaming filesinks support the same filesystems as the batch ones Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dc0d81bb Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dc0d81bb Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dc0d81bb Branch: refs/heads/master Commit: dc0d81bb8bc056f9c8def5ce701c1aa121d2a13f Parents: 7df6a3d Author: szape <[email protected]> Authored: Tue Jan 13 14:54:01 2015 +0100 Committer: mbalassi <[email protected]> Committed: Wed Jan 21 16:05:25 2015 +0100 ---------------------------------------------------------------------- .../streaming/api/datastream/DataStream.java | 163 +++++++++++++++---- .../api/function/sink/FileSinkFunction.java | 118 ++++++++++++++ .../function/sink/FileSinkFunctionByMillis.java | 59 +++++++ .../api/function/sink/RichSinkFunction.java | 2 +- .../api/function/sink/SinkFunction.java | 2 +- 5 files changed, 306 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/dc0d81bb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index 7d1659f..db644e9 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.datastream; import java.util.ArrayList; import java.util.List; +import org.apache.commons.lang3.Validate; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; @@ -28,15 +29,20 @@ import org.apache.flink.api.common.functions.RichFilterFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.functions.RichReduceFunction; +import org.apache.flink.api.common.io.OutputFormat; import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.io.CsvOutputFormat; +import org.apache.flink.api.java.io.TextOutputFormat; import org.apache.flink.api.java.operators.Keys; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.core.fs.FileSystem.WriteMode; +import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.JobGraphBuilder; import org.apache.flink.streaming.api.datastream.temporaloperator.StreamCrossOperator; import org.apache.flink.streaming.api.datastream.temporaloperator.StreamJoinOperator; @@ -45,12 +51,9 @@ import org.apache.flink.streaming.api.function.aggregation.AggregationFunction; import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType; import org.apache.flink.streaming.api.function.aggregation.ComparableAggregator; import org.apache.flink.streaming.api.function.aggregation.SumAggregator; +import org.apache.flink.streaming.api.function.sink.FileSinkFunctionByMillis; import org.apache.flink.streaming.api.function.sink.PrintSinkFunction; import org.apache.flink.streaming.api.function.sink.SinkFunction; -import org.apache.flink.streaming.api.function.sink.WriteFormat; -import org.apache.flink.streaming.api.function.sink.WriteFormatAsCsv; -import org.apache.flink.streaming.api.function.sink.WriteFormatAsText; -import org.apache.flink.streaming.api.function.sink.WriteSinkFunctionByMillis; import org.apache.flink.streaming.api.invokable.SinkInvokable; import org.apache.flink.streaming.api.invokable.StreamInvokable; import org.apache.flink.streaming.api.invokable.operator.CounterInvokable; @@ -897,20 +900,35 @@ public class DataStream<OUT> { } /** + * Writes a DataStream to the file specified by path in text format. For + * every element of the DataStream the result of {@link Object#toString()} + * is written. + * + * @param path + * the path pointing to the location the text file is written to + * + * @return the closed DataStream. + */ + public DataStreamSink<OUT> writeAsText(String path) { + return writeToFile(new TextOutputFormat<OUT>(new Path(path)), 0L); + } + + /** * Writes a DataStream to the file specified by path in text format. The * writing is performed periodically, in every millis milliseconds. For * every element of the DataStream the result of {@link Object#toString()} * is written. * * @param path - * is the path to the location where the tuples are written + * the path pointing to the location the text file is written to * @param millis - * is the file update frequency + * the file update frequency * - * @return The closed DataStream + * @return the closed DataStream */ public DataStreamSink<OUT> writeAsText(String path, long millis) { - return writeAsText(path, new WriteFormatAsText<OUT>(), millis); + TextOutputFormat<OUT> tof = new TextOutputFormat<OUT>(new Path(path)); + return writeToFile(tof, millis); } /** @@ -919,64 +937,137 @@ public class DataStream<OUT> { * is written. * * @param path - * is the path to the location where the tuples are written + * the path pointing to the location the text file is written to + * @param writeMode + * Control the behavior for existing files. Options are + * NO_OVERWRITE and OVERWRITE. * - * @return The closed DataStream + * @return the closed DataStream. */ - public DataStreamSink<OUT> writeAsText(String path) { - return writeAsText(path, 0); + public DataStreamSink<OUT> writeAsText(String path, WriteMode writeMode) { + TextOutputFormat<OUT> tof = new TextOutputFormat<OUT>(new Path(path)); + tof.setWriteMode(writeMode); + return writeToFile(tof, 0L); } /** - * Writes a DataStream to the file specified by path in text format. The - * writing is performed periodically, in every millis milliseconds. For + * Writes a DataStream to the file specified by path in text format. For * every element of the DataStream the result of {@link Object#toString()} * is written. * * @param path - * is the path to the location where the tuples are written + * the path pointing to the location the text file is written to + * @param writeMode + * Controls the behavior for existing files. Options are + * NO_OVERWRITE and OVERWRITE. * @param millis - * is the file update frequency + * the file update frequency * - * @return The closed DataStream + * @return the closed DataStream. */ - public DataStreamSink<OUT> writeAsCsv(String path, long millis) { - if (!getType().isTupleType()) { - throw new RuntimeException("Only tuple data streams can be written in csv format"); - } - return writeAsText(path, new WriteFormatAsCsv<OUT>(), millis); + public DataStreamSink<OUT> writeAsText(String path, WriteMode writeMode, long millis) { + TextOutputFormat<OUT> tof = new TextOutputFormat<OUT>(new Path(path)); + tof.setWriteMode(writeMode); + return writeToFile(tof, millis); } /** - * Writes a DataStream to the file specified by path in text format. For + * Writes a DataStream to the file specified by path in csv format. For * every element of the DataStream the result of {@link Object#toString()} - * is written. + * is written. This method can only be used on data streams of tuples. * * @param path - * is the path to the location where the tuples are written + * the path pointing to the location the text file is written to * - * @return The closed DataStream + * @return the closed DataStream */ - public DataStreamSink<OUT> writeAsCsv(String path) { - return writeAsCsv(path, 0); + @SuppressWarnings("unchecked") + public <X extends Tuple> DataStreamSink<OUT> writeAsCsv(String path) { + Validate.isTrue(getType().isTupleType(), + "The writeAsCsv() method can only be used on data sets of tuples."); + CsvOutputFormat<X> of = new CsvOutputFormat<X>(new Path(path), + CsvOutputFormat.DEFAULT_LINE_DELIMITER, CsvOutputFormat.DEFAULT_FIELD_DELIMITER); + return writeToFile((OutputFormat<OUT>) of, 0L); } /** - * Writes a DataStream to the file specified by path in text format. The + * Writes a DataStream to the file specified by path in csv format. The * writing is performed periodically, in every millis milliseconds. For * every element of the DataStream the result of {@link Object#toString()} - * is written. + * is written. This method can only be used on data streams of tuples. * * @param path - * is the path to the location where the tuples are written + * the path pointing to the location the text file is written to * @param millis - * is the file update frequency + * the file update frequency * - * @return the data stream constructed + * @return the closed DataStream */ - private DataStreamSink<OUT> writeAsText(String path, WriteFormat<OUT> format, long millis) { - DataStreamSink<OUT> returnStream = addSink(new WriteSinkFunctionByMillis<OUT>(path, format, - millis)); + @SuppressWarnings("unchecked") + public <X extends Tuple> DataStreamSink<OUT> writeAsCsv(String path, long millis) { + Validate.isTrue(getType().isTupleType(), + "The writeAsCsv() method can only be used on data sets of tuples."); + CsvOutputFormat<X> of = new CsvOutputFormat<X>(new Path(path), + CsvOutputFormat.DEFAULT_LINE_DELIMITER, CsvOutputFormat.DEFAULT_FIELD_DELIMITER); + return writeToFile((OutputFormat<OUT>) of, millis); + } + + /** + * Writes a DataStream to the file specified by path in csv format. For + * every element of the DataStream the result of {@link Object#toString()} + * is written. This method can only be used on data streams of tuples. + * + * @param path + * the path pointing to the location the text file is written to + * @param writeMode + * Controls the behavior for existing files. Options are + * NO_OVERWRITE and OVERWRITE. + * + * @return the closed DataStream + */ + @SuppressWarnings("unchecked") + public <X extends Tuple> DataStreamSink<OUT> writeAsCsv(String path, WriteMode writeMode) { + Validate.isTrue(getType().isTupleType(), + "The writeAsCsv() method can only be used on data sets of tuples."); + CsvOutputFormat<X> of = new CsvOutputFormat<X>(new Path(path), + CsvOutputFormat.DEFAULT_LINE_DELIMITER, CsvOutputFormat.DEFAULT_FIELD_DELIMITER); + if (writeMode != null) { + of.setWriteMode(writeMode); + } + return writeToFile((OutputFormat<OUT>) of, 0L); + } + + /** + * Writes a DataStream to the file specified by path in csv format. The + * writing is performed periodically, in every millis milliseconds. For + * every element of the DataStream the result of {@link Object#toString()} + * is written. This method can only be used on data streams of tuples. + * + * @param path + * the path pointing to the location the text file is written to + * @param writeMode + * Controls the behavior for existing files. Options are + * NO_OVERWRITE and OVERWRITE. + * @param millis + * the file update frequency + * + * @return the closed DataStream + */ + @SuppressWarnings("unchecked") + public <X extends Tuple> DataStreamSink<OUT> writeAsCsv(String path, WriteMode writeMode, + long millis) { + Validate.isTrue(getType().isTupleType(), + "The writeAsCsv() method can only be used on data sets of tuples."); + CsvOutputFormat<X> of = new CsvOutputFormat<X>(new Path(path), + CsvOutputFormat.DEFAULT_LINE_DELIMITER, CsvOutputFormat.DEFAULT_FIELD_DELIMITER); + if (writeMode != null) { + of.setWriteMode(writeMode); + } + return writeToFile((OutputFormat<OUT>) of, millis); + } + + private DataStreamSink<OUT> writeToFile(OutputFormat<OUT> format, long millis) { + DataStreamSink<OUT> returnStream = addSink(new FileSinkFunctionByMillis<OUT>(format, millis)); return returnStream; } http://git-wip-us.apache.org/repos/asf/flink/blob/dc0d81bb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java new file mode 100644 index 0000000..24beba1 --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.function.sink; + +import java.io.IOException; +import java.util.ArrayList; + +import org.apache.flink.api.common.io.CleanupWhenUnsuccessful; +import org.apache.flink.api.common.io.OutputFormat; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Simple implementation of the SinkFunction writing tuples in the specified + * OutputFormat format. Tuples are collected to a list and written to the file + * periodically. The target path and the overwrite mode are pre-packaged in + * format. + * + * @param <IN> + * Input type + */ +public abstract class FileSinkFunction<IN> extends RichSinkFunction<IN> { + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(FileSinkFunction.class); + protected ArrayList<IN> tupleList = new ArrayList<IN>(); + protected volatile OutputFormat<IN> format; + protected volatile boolean cleanupCalled = false; + protected int indexInSubtaskGroup; + protected int currentNumberOfSubtasks; + + public FileSinkFunction(OutputFormat<IN> format) { + this.format = format; + } + + @Override + public void open(Configuration parameters) throws Exception { + StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext(); + format.configure(context.getTaskStubParameters()); + indexInSubtaskGroup = context.getIndexOfThisSubtask(); + currentNumberOfSubtasks = context.getNumberOfParallelSubtasks(); + format.open(indexInSubtaskGroup, currentNumberOfSubtasks); + } + + @Override + public void invoke(IN record) throws Exception { + tupleList.add(record); + if (updateCondition()) { + flush(); + } + } + + @Override + public void close() throws IOException { + if (!tupleList.isEmpty()) { + flush(); + } + try { + format.close(); + } catch (Exception ex) { + try { + if (!cleanupCalled && format instanceof CleanupWhenUnsuccessful) { + cleanupCalled = true; + ((CleanupWhenUnsuccessful) format).tryCleanupOnError(); + } + } catch (Throwable t) { + LOG.error("Cleanup on error failed.", t); + } + } + } + + protected void flush() { + try { + for (IN rec : tupleList) { + format.writeRecord(rec); + } + } catch (Exception ex) { + try { + if (!cleanupCalled && format instanceof CleanupWhenUnsuccessful) { + cleanupCalled = true; + ((CleanupWhenUnsuccessful) format).tryCleanupOnError(); + } + } catch (Throwable t) { + LOG.error("Cleanup on error failed.", t); + } + } + resetParameters(); + } + + /** + * Condition for writing the contents of tupleList and clearing it. + * + * @return value of the updating condition + */ + protected abstract boolean updateCondition(); + + /** + * Statements to be executed after writing a batch goes here. + */ + protected abstract void resetParameters(); + +} http://git-wip-us.apache.org/repos/asf/flink/blob/dc0d81bb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunctionByMillis.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunctionByMillis.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunctionByMillis.java new file mode 100644 index 0000000..f049a32 --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunctionByMillis.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.function.sink; + +import org.apache.flink.api.common.io.OutputFormat; + +/** + * Implementation of FileSinkFunction. Writes tuples to file in every millis + * milliseconds. + * + * @param <IN> + * Input type + */ +public class FileSinkFunctionByMillis<IN> extends FileSinkFunction<IN> { + private static final long serialVersionUID = 1L; + + private final long millis; + private long lastTime; + + public FileSinkFunctionByMillis(OutputFormat<IN> format, long millis) { + super(format); + this.millis = millis; + lastTime = System.currentTimeMillis(); + } + + /** + * Condition for writing the contents of tupleList and clearing it. + * + * @return value of the updating condition + */ + @Override + protected boolean updateCondition() { + return System.currentTimeMillis() - lastTime >= millis; + } + + /** + * Statements to be executed after writing a batch goes here. + */ + @Override + protected void resetParameters() { + tupleList.clear(); + lastTime = System.currentTimeMillis(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/dc0d81bb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/RichSinkFunction.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/RichSinkFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/RichSinkFunction.java index b89c169..3b8a4db 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/RichSinkFunction.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/RichSinkFunction.java @@ -23,6 +23,6 @@ public abstract class RichSinkFunction<IN> extends AbstractRichFunction implemen private static final long serialVersionUID = 1L; - public abstract void invoke(IN value); + public abstract void invoke(IN value) throws Exception; } http://git-wip-us.apache.org/repos/asf/flink/blob/dc0d81bb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java index 9db268e..6097603 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java @@ -23,6 +23,6 @@ import org.apache.flink.api.common.functions.Function; public interface SinkFunction<IN> extends Function, Serializable { - public abstract void invoke(IN value); + public abstract void invoke(IN value) throws Exception; }
