Repository: flink Updated Branches: refs/heads/release-1.4 a0193f10a -> b74c70515
[FLINK-8543] Don't call super.close() in AvroKeyValueSinkWriter The call to keyValueWriter.close() in AvroKeyValueSinkWriter.close() will eventually call flush() on the wrapped stream which fails if we close it before(). Now we call flush ourselves before closing the KeyValyeWriter, which internally closes the wrapped stream eventually. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b74c7051 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b74c7051 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b74c7051 Branch: refs/heads/release-1.4 Commit: b74c705157fef3e0d305fdd5bf1a006ae0a98666 Parents: a0193f1 Author: Aljoscha Krettek <[email protected]> Authored: Thu Feb 22 17:24:33 2018 +0100 Committer: Aljoscha Krettek <[email protected]> Committed: Mon Feb 26 21:12:02 2018 +0100 ---------------------------------------------------------------------- .../connectors/fs/AvroKeyValueSinkWriter.java | 22 +++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b74c7051/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java index e931633..6b2f7d6 100644 --- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java +++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java @@ -150,17 +150,29 @@ public class AvroKeyValueSinkWriter<K, V> extends StreamWriterBase<Tuple2<K, V>> public void open(FileSystem fs, Path path) throws IOException { super.open(fs, path); - CodecFactory compressionCodec = getCompressionCodec(properties); - Schema keySchema = Schema.parse(properties.get(CONF_OUTPUT_KEY_SCHEMA)); - Schema valueSchema = Schema.parse(properties.get(CONF_OUTPUT_VALUE_SCHEMA)); - keyValueWriter = new AvroKeyValueWriter<K, V>(keySchema, valueSchema, compressionCodec, getStream()); + try { + CodecFactory compressionCodec = getCompressionCodec(properties); + Schema keySchema = Schema.parse(properties.get(CONF_OUTPUT_KEY_SCHEMA)); + Schema valueSchema = Schema.parse(properties.get(CONF_OUTPUT_VALUE_SCHEMA)); + keyValueWriter = new AvroKeyValueWriter<K, V>( + keySchema, + valueSchema, + compressionCodec, + getStream()); + } finally { + if (keyValueWriter == null) { + close(); + } + } } @Override public void close() throws IOException { - super.close(); //the order is important since super.close flushes inside if (keyValueWriter != null) { keyValueWriter.close(); + } else { + // need to make sure we close this if we never created the Key/Value Writer. + super.close(); } }
