This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ba151c6  [FLINK-13832] Rename DefaultRollingPolicy create to builder
ba151c6 is described below

commit ba151c6b278ac7b25b7c514711639cdbd270e1fd
Author: Gyula Fora <[email protected]>
AuthorDate: Tue Aug 27 12:30:22 2019 +0200

    [FLINK-13832] Rename DefaultRollingPolicy create to builder
    
    Closes #9527
---
 .../sink/filesystem/StreamingFileSink.java         |  2 +-
 .../rollingpolicies/DefaultRollingPolicy.java      | 39 ++++++++++++++++++++--
 .../sink/filesystem/BucketAssignerITCases.java     |  2 +-
 .../api/functions/sink/filesystem/BucketTest.java  |  2 +-
 .../api/functions/sink/filesystem/BucketsTest.java |  4 +--
 .../sink/filesystem/RollingPolicyTest.java         | 15 ++++++++-
 .../api/functions/sink/filesystem/TestUtils.java   |  2 +-
 7 files changed, 56 insertions(+), 10 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
index 975265c..716b4c9 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
@@ -202,7 +202,7 @@ public class StreamingFileSink<IN>
                private final String partFileSuffix;
 
                RowFormatBuilder(Path basePath, Encoder<IN> encoder, 
BucketAssigner<IN, BucketID> bucketAssigner) {
-                       this(basePath, encoder, bucketAssigner, 
DefaultRollingPolicy.create().build(), 60L * 1000L, new 
DefaultBucketFactoryImpl<>(), PartFileConfig.DEFAULT_PART_PREFIX, 
PartFileConfig.DEFAULT_PART_SUFFIX);
+                       this(basePath, encoder, bucketAssigner, 
DefaultRollingPolicy.builder().build(), 60L * 1000L, new 
DefaultBucketFactoryImpl<>(), PartFileConfig.DEFAULT_PART_PREFIX, 
PartFileConfig.DEFAULT_PART_SUFFIX);
                }
 
                private RowFormatBuilder(
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.java
index f890326..d9bfbcc 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.java
@@ -83,10 +83,34 @@ public final class DefaultRollingPolicy<IN, BucketID> 
implements RollingPolicy<I
        }
 
        /**
-        * Initiates the instantiation of a {@code DefaultRollingPolicy}.
-        * To finalize it and have the actual policy, call {@code .create()}.
+        * Returns the maximum part file size before rolling.
+        * @return Max size in bytes
         */
-       public static DefaultRollingPolicy.PolicyBuilder create() {
+       public long getMaxPartSize() {
+               return partSize;
+       }
+
+       /**
+        * Returns the maximum time duration a part file can stay open before 
rolling.
+        * @return Time duration in milliseconds
+        */
+       public long getRolloverInterval() {
+               return rolloverInterval;
+       }
+
+       /**
+        * Returns time duration of allowed inactivity after which a part file 
will have to roll.
+        * @return Time duration in milliseconds
+        */
+       public long getInactivityInterval() {
+               return inactivityInterval;
+       }
+
+       /**
+        * Creates a new {@link PolicyBuilder} that is used to configure and 
build
+        * an instance of {@code DefaultRollingPolicy}.
+        */
+       public static DefaultRollingPolicy.PolicyBuilder builder() {
                return new DefaultRollingPolicy.PolicyBuilder(
                                DEFAULT_MAX_PART_SIZE,
                                DEFAULT_ROLLOVER_INTERVAL,
@@ -94,7 +118,16 @@ public final class DefaultRollingPolicy<IN, BucketID> 
implements RollingPolicy<I
        }
 
        /**
+        * This method is {@link Deprecated}, use {@link 
DefaultRollingPolicy#builder()} instead.
+        */
+       @Deprecated
+       public static DefaultRollingPolicy.PolicyBuilder create() {
+               return builder();
+       }
+
+       /**
         * A helper class that holds the configuration properties for the 
{@link DefaultRollingPolicy}.
+        * The {@link PolicyBuilder#build()} method must be called to 
instantiate the policy.
         */
        @PublicEvolving
        public static final class PolicyBuilder {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java
index 8dc44e6..cc91a33 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java
@@ -46,7 +46,7 @@ public class BucketAssignerITCases {
 
                final RollingPolicy<String, String> rollingPolicy =
                        DefaultRollingPolicy
-                               .create()
+                               .builder()
                                .withMaxPartSize(7L)
                                .build();
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java
index 969ecd7..546a08c 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java
@@ -369,7 +369,7 @@ public class BucketTest {
 
        private static final String bucketId = "testing-bucket";
 
-       private static final RollingPolicy<String, String> rollingPolicy = 
DefaultRollingPolicy.create().build();
+       private static final RollingPolicy<String, String> rollingPolicy = 
DefaultRollingPolicy.builder().build();
 
        private static final PartFileWriter.PartFileFactory<String, String> 
partFileFactory =
                        new RowWisePartWriter.Factory<>(new 
SimpleStringEncoder<>());
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
index ba2ea57..ac7071d 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
@@ -107,7 +107,7 @@ public class BucketsTest {
 
                final RollingPolicy<String, String> onCheckpointRP =
                                DefaultRollingPolicy
-                                               .create()
+                                               .builder()
                                                .withMaxPartSize(7L) // roll 
with 2 elements
                                                .build();
 
@@ -316,7 +316,7 @@ public class BucketsTest {
                                new VerifyingBucketAssigner(timestamp, 
watermark, processingTime),
                                new DefaultBucketFactoryImpl<>(),
                                new RowWisePartWriter.Factory<>(new 
SimpleStringEncoder<>()),
-                               DefaultRollingPolicy.create().build(),
+                               DefaultRollingPolicy.builder().build(),
                                2,
                                new PartFileConfig()
                );
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
index ed3ac0c..be9db3e 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
@@ -47,7 +47,7 @@ public class RollingPolicyTest {
 
                final RollingPolicy<String, String> originalRollingPolicy =
                                DefaultRollingPolicy
-                                               .create()
+                                               .builder()
                                                .withMaxPartSize(10L)
                                                .withInactivityInterval(4L)
                                                .withRolloverInterval(11L)
@@ -88,6 +88,19 @@ public class RollingPolicyTest {
        }
 
        @Test
+       public void testDefaultRollingPolicyDeprecatedCreate() throws Exception 
{
+               DefaultRollingPolicy policy = DefaultRollingPolicy.create()
+                       .withInactivityInterval(10)
+                       .withMaxPartSize(20)
+                       .withRolloverInterval(30)
+                       .build();
+
+               Assert.assertEquals(10, policy.getInactivityInterval());
+               Assert.assertEquals(20, policy.getMaxPartSize());
+               Assert.assertEquals(30, policy.getRolloverInterval());
+       }
+
+       @Test
        public void testRollOnCheckpointPolicy() throws Exception {
                final File outDir = TEMP_FOLDER.newFolder();
                final Path path = new Path(outDir.toURI());
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
index 0333df6..55ba6d3 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
@@ -62,7 +62,7 @@ public class TestUtils {
 
                final RollingPolicy<Tuple2<String, Integer>, String> 
rollingPolicy =
                                DefaultRollingPolicy
-                                               .create()
+                                               .builder()
                                                .withMaxPartSize(partMaxSize)
                                                
.withRolloverInterval(inactivityInterval)
                                                
.withInactivityInterval(inactivityInterval)

Reply via email to