Repository: flink Updated Branches: refs/heads/master a2d1d084b -> 915213c7a
[FLINK-8543] Don't call super.close() in AvroKeyValueSinkWriter The call to keyValueWriter.close() in AvroKeyValueSinkWriter.close() will eventually call flush() on the wrapped stream which fails if we close it before(). Now we call flush ourselves before closing the KeyValyeWriter, which internally closes the wrapped stream eventually. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/915213c7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/915213c7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/915213c7 Branch: refs/heads/master Commit: 915213c7afaf3f9d04c240f43d88710280d844e3 Parents: a2d1d08 Author: Aljoscha Krettek <[email protected]> Authored: Thu Feb 22 17:24:33 2018 +0100 Committer: Aljoscha Krettek <[email protected]> Committed: Mon Feb 26 21:10:29 2018 +0100 ---------------------------------------------------------------------- .../connectors/fs/AvroKeyValueSinkWriter.java | 22 +++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/915213c7/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java index e931633..6b2f7d6 100644 --- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java +++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java @@ -150,17 +150,29 @@ public class AvroKeyValueSinkWriter<K, V> extends StreamWriterBase<Tuple2<K, V>> public void open(FileSystem fs, Path path) throws IOException { super.open(fs, path); - CodecFactory compressionCodec = getCompressionCodec(properties); - Schema keySchema = Schema.parse(properties.get(CONF_OUTPUT_KEY_SCHEMA)); - Schema valueSchema = Schema.parse(properties.get(CONF_OUTPUT_VALUE_SCHEMA)); - keyValueWriter = new AvroKeyValueWriter<K, V>(keySchema, valueSchema, compressionCodec, getStream()); + try { + CodecFactory compressionCodec = getCompressionCodec(properties); + Schema keySchema = Schema.parse(properties.get(CONF_OUTPUT_KEY_SCHEMA)); + Schema valueSchema = Schema.parse(properties.get(CONF_OUTPUT_VALUE_SCHEMA)); + keyValueWriter = new AvroKeyValueWriter<K, V>( + keySchema, + valueSchema, + compressionCodec, + getStream()); + } finally { + if (keyValueWriter == null) { + close(); + } + } } @Override public void close() throws IOException { - super.close(); //the order is important since super.close flushes inside if (keyValueWriter != null) { keyValueWriter.close(); + } else { + // need to make sure we close this if we never created the Key/Value Writer. + super.close(); } }
