Repository: flink
Updated Branches:
  refs/heads/master 2f7392d77 -> 9da315234


[FLINK-8600] Allow disabling truncate() check in BucketingSink

The test was failing when using PrestoS3FileSystem because it doesn't
use an absolute/qualified path.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9da31523
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9da31523
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9da31523

Branch: refs/heads/master
Commit: 9da315234d1583078a24d3d31a5dcc89a54c5dc6
Parents: 2f7392d
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Wed Feb 14 14:48:22 2018 +0100
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Tue Feb 20 09:31:55 2018 +0100

----------------------------------------------------------------------
 .../connectors/fs/bucketing/BucketingSink.java  | 27 ++++++++++++++++++--
 1 file changed, 25 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9da31523/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 
b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
index 6293fe0..6e7f460 100644
--- 
a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
+++ 
b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
@@ -265,6 +265,8 @@ public class BucketingSink<T>
 
        private String partPrefix = DEFAULT_PART_REFIX;
 
+       private boolean useTruncate = true;
+
        /**
         * The timeout for asynchronous operations such as recoverLease and 
truncate (in {@code ms}).
         */
@@ -572,6 +574,12 @@ public class BucketingSink<T>
         * <p><b>NOTE:</b> This code comes from Flume.
         */
        private Method reflectTruncate(FileSystem fs) {
+               // completely disable the check for truncate() because the 
check can be problematic
+               // on some filesystem implementations
+               if (!useTruncate) {
+                       return null;
+               }
+
                Method m = null;
                if (fs != null) {
                        Class<?> fsClass = fs.getClass();
@@ -592,7 +600,9 @@ public class BucketingSink<T>
                                outputStream.close();
                        } catch (IOException e) {
                                LOG.error("Could not create file for checking 
if truncate works.", e);
-                               throw new RuntimeException("Could not create 
file for checking if truncate works.", e);
+                               throw new RuntimeException("Could not create 
file for checking if truncate works. " +
+                                       "You can disable support for truncate() 
completely via " +
+                                       "BucketingSink.setUseTruncate(false).", 
e);
                        }
 
                        try {
@@ -606,7 +616,9 @@ public class BucketingSink<T>
                                fs.delete(testPath, false);
                        } catch (IOException e) {
                                LOG.error("Could not delete truncate test 
file.", e);
-                               throw new RuntimeException("Could not delete 
truncate test file.", e);
+                               throw new RuntimeException("Could not delete 
truncate test file. " +
+                                       "You can disable support for truncate() 
completely via " +
+                                       "BucketingSink.setUseTruncate(false).", 
e);
                        }
                }
                return m;
@@ -983,6 +995,17 @@ public class BucketingSink<T>
        }
 
        /**
+        * Sets whether to use {@code FileSystem.truncate()} to truncate 
written bucket files back to
+        * a consistent state in case of a restore from checkpoint. If {@code 
truncate()} is not used
+        * this sink will write valid-length files for corresponding bucket 
files that have to be used
+        * when reading from bucket files to make sure to not read too far.
+        */
+       public BucketingSink<T> setUseTruncate(boolean useTruncate) {
+               this.useTruncate = useTruncate;
+               return this;
+       }
+
+       /**
         * Disable cleanup of leftover in-progress/pending files when the sink 
is opened.
         *
         *

Reply via email to