Repository: flink Updated Branches: refs/heads/master 2f7392d77 -> 9da315234
[FLINK-8600] Allow disabling truncate() check in BucketingSink The test was failing when using PrestoS3FileSystem because it doesn't use an absolute/qualified path. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9da31523 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9da31523 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9da31523 Branch: refs/heads/master Commit: 9da315234d1583078a24d3d31a5dcc89a54c5dc6 Parents: 2f7392d Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Wed Feb 14 14:48:22 2018 +0100 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Tue Feb 20 09:31:55 2018 +0100 ---------------------------------------------------------------------- .../connectors/fs/bucketing/BucketingSink.java | 27 ++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9da31523/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java index 6293fe0..6e7f460 100644 --- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java +++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java @@ -265,6 +265,8 @@ public class BucketingSink<T> private String partPrefix = DEFAULT_PART_REFIX; + private boolean useTruncate = true; + /** * The timeout for asynchronous operations such as recoverLease and truncate (in {@code ms}). */ @@ -572,6 +574,12 @@ public class BucketingSink<T> * <p><b>NOTE:</b> This code comes from Flume. */ private Method reflectTruncate(FileSystem fs) { + // completely disable the check for truncate() because the check can be problematic + // on some filesystem implementations + if (!useTruncate) { + return null; + } + Method m = null; if (fs != null) { Class<?> fsClass = fs.getClass(); @@ -592,7 +600,9 @@ public class BucketingSink<T> outputStream.close(); } catch (IOException e) { LOG.error("Could not create file for checking if truncate works.", e); - throw new RuntimeException("Could not create file for checking if truncate works.", e); + throw new RuntimeException("Could not create file for checking if truncate works. " + + "You can disable support for truncate() completely via " + + "BucketingSink.setUseTruncate(false).", e); } try { @@ -606,7 +616,9 @@ public class BucketingSink<T> fs.delete(testPath, false); } catch (IOException e) { LOG.error("Could not delete truncate test file.", e); - throw new RuntimeException("Could not delete truncate test file.", e); + throw new RuntimeException("Could not delete truncate test file. " + + "You can disable support for truncate() completely via " + + "BucketingSink.setUseTruncate(false).", e); } } return m; @@ -983,6 +995,17 @@ public class BucketingSink<T> } /** + * Sets whether to use {@code FileSystem.truncate()} to truncate written bucket files back to + * a consistent state in case of a restore from checkpoint. If {@code truncate()} is not used + * this sink will write valid-length files for corresponding bucket files that have to be used + * when reading from bucket files to make sure to not read too far. + */ + public BucketingSink<T> setUseTruncate(boolean useTruncate) { + this.useTruncate = useTruncate; + return this; + } + + /** * Disable cleanup of leftover in-progress/pending files when the sink is opened. * *