pnowojski commented on code in PR #24857:
URL: https://github.com/apache/flink/pull/24857#discussion_r1617557245


##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleITCase.java:
##########
@@ -336,38 +335,57 @@ public void create(
         },
         CUSTOM_PARTITIONER {
             final int sinkParallelism = 3;
-            final int numberElements = 1000;
 
             @Override
             public void create(
-                    StreamExecutionEnvironment environment,
+                    StreamExecutionEnvironment env,
                     int minCheckpoints,
                     boolean slotSharing,
-                    int expectedFailuresUntilSourceFinishes,
+                    int expectedRestarts,
                     long sourceSleepMs) {

Review Comment:
   The old version was silently ignoring `minCheckpoints`, `expectedRestarts` 
and `sourceSleepMs` parameters, without throwing any kind of error.



##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleITCase.java:
##########
@@ -336,38 +335,57 @@ public void create(
         },
         CUSTOM_PARTITIONER {
             final int sinkParallelism = 3;
-            final int numberElements = 1000;
 
             @Override
             public void create(
-                    StreamExecutionEnvironment environment,
+                    StreamExecutionEnvironment env,
                     int minCheckpoints,
                     boolean slotSharing,
-                    int expectedFailuresUntilSourceFinishes,
+                    int expectedRestarts,
                     long sourceSleepMs) {
-                int parallelism = environment.getParallelism();
-                environment
-                        .fromData(generateStrings(numberElements / 
parallelism, sinkParallelism))
+                int parallelism = env.getParallelism();
+
+                env.fromSource(
+                                new LongSource(
+                                        minCheckpoints,
+                                        parallelism,
+                                        expectedRestarts,
+                                        env.getCheckpointInterval(),
+                                        sourceSleepMs),
+                                noWatermarks(),
+                                "source")
                         .name("source")
+                        .uid("source")

Review Comment:
   old version was missing `uid`, which could cause problems after rescaling



##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleITCase.java:
##########
@@ -761,43 +778,12 @@ public int partition(String key, int numPartitions) {
         }
     }
 
-    private static class StringSink implements SinkFunction<String>, 
CheckpointedFunction {
-
-        static volatile boolean failed = false;
-
-        int checkpointConsumed = 0;
-
-        int recordsConsumed = 0;
-
-        final int numberElements;
-
-        public StringSink(int numberElements) {
-            this.numberElements = numberElements;
-        }
-
-        @Override
-        public void invoke(String value, Context ctx) throws Exception {
-            if (!failed && checkpointConsumed > 1) {
-                failed = true;
-                throw new TestException("FAIL");
-            }
-            recordsConsumed++;
-            if (!failed && recordsConsumed == (numberElements / 3)) {
-                Thread.sleep(1000);
-            }
-            if (recordsConsumed == (numberElements - 1)) {
-                Thread.sleep(1000);
-            }
-        }
-
-        @Override
-        public void snapshotState(FunctionSnapshotContext context) {
-            checkpointConsumed++;
-        }
-
+    private static class BackPressureInducingSink<T> implements 
SinkFunction<T> {
         @Override
-        public void initializeState(FunctionInitializationContext context) {
-            // do  nothing
+        public void invoke(T value, Context ctx) throws Exception {
+            // TODO: maybe similarly to VerifyingSink, we should back pressure 
only until some point
+            // but currently it doesn't seem to be needed (test runs quickly 
enough)
+            Thread.sleep(1);

Review Comment:
   It looks like the simple version is good enough.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to