Repository: flink
Updated Branches:
  refs/heads/release-1.4 a0193f10a -> b74c70515


[FLINK-8543] Don't call super.close() in AvroKeyValueSinkWriter

The call to keyValueWriter.close() in AvroKeyValueSinkWriter.close()
will eventually call flush() on the wrapped stream which fails if we
close it before(). Now we call flush ourselves before closing the
KeyValyeWriter, which internally closes the wrapped stream eventually.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b74c7051
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b74c7051
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b74c7051

Branch: refs/heads/release-1.4
Commit: b74c705157fef3e0d305fdd5bf1a006ae0a98666
Parents: a0193f1
Author: Aljoscha Krettek <[email protected]>
Authored: Thu Feb 22 17:24:33 2018 +0100
Committer: Aljoscha Krettek <[email protected]>
Committed: Mon Feb 26 21:12:02 2018 +0100

----------------------------------------------------------------------
 .../connectors/fs/AvroKeyValueSinkWriter.java   | 22 +++++++++++++++-----
 1 file changed, 17 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b74c7051/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java
 
b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java
index e931633..6b2f7d6 100644
--- 
a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java
+++ 
b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java
@@ -150,17 +150,29 @@ public class AvroKeyValueSinkWriter<K, V> extends 
StreamWriterBase<Tuple2<K, V>>
        public void open(FileSystem fs, Path path) throws IOException {
                super.open(fs, path);
 
-               CodecFactory compressionCodec = getCompressionCodec(properties);
-               Schema keySchema = 
Schema.parse(properties.get(CONF_OUTPUT_KEY_SCHEMA));
-               Schema valueSchema = 
Schema.parse(properties.get(CONF_OUTPUT_VALUE_SCHEMA));
-               keyValueWriter = new AvroKeyValueWriter<K, V>(keySchema, 
valueSchema, compressionCodec, getStream());
+               try {
+                       CodecFactory compressionCodec = 
getCompressionCodec(properties);
+                       Schema keySchema = 
Schema.parse(properties.get(CONF_OUTPUT_KEY_SCHEMA));
+                       Schema valueSchema = 
Schema.parse(properties.get(CONF_OUTPUT_VALUE_SCHEMA));
+                       keyValueWriter = new AvroKeyValueWriter<K, V>(
+                               keySchema,
+                               valueSchema,
+                               compressionCodec,
+                               getStream());
+               } finally {
+                       if (keyValueWriter == null) {
+                               close();
+                       }
+               }
        }
 
        @Override
        public void close() throws IOException {
-               super.close(); //the order is important since super.close 
flushes inside
                if (keyValueWriter != null) {
                        keyValueWriter.close();
+               } else {
+                       // need to make sure we close this if we never created 
the Key/Value Writer.
+                       super.close();
                }
        }
 

Reply via email to