dzikosc commented on code in PR #20133:
URL: https://github.com/apache/flink/pull/20133#discussion_r913039077


##########
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java:
##########
@@ -69,104 +81,174 @@ public class FlinkKinesisITCase extends TestLogger {
 
     @Rule public TemporaryFolder temp = new TemporaryFolder();
 
+    @Rule public SharedObjects sharedObjects = SharedObjects.create();
+
     private static final SimpleStringSchema STRING_SCHEMA = new 
SimpleStringSchema();
 
     private KinesisPubsubClient client;
 
     @Before
-    public void setupClient() {
-        client = new KinesisPubsubClient(kinesalite.getContainerProperties());
+    public void setupClient() throws Exception {
+        client = new KinesisPubsubClient(getContainerProperties());
+        stream = TestNameProvider.getCurrentTestName().replaceAll("\\W", "");
+        client.createTopic(stream, 1, new Properties());
+    }
+
+    @Test
+    public void testStopWithSavepoint() throws Exception {
+        testStopWithSavepoint(false, false);
+    }
+
+    @Test
+    public void testStopWithSavepointWithDrain() throws Exception {
+        testStopWithSavepoint(true, false);
+    }
+
+    @Test
+    @Ignore("Kinesalite does not support EFO")
+    public void testStopWithSavepointWithEfo() throws Exception {
+        testStopWithSavepoint(false, true);
+    }
+
+    @Test
+    @Ignore("Kinesalite does not support EFO")
+    public void testStopWithSavepointWithDrainAndEfo() throws Exception {
+        testStopWithSavepoint(true, true);
     }
 
     /**
      * Tests that pending elements do not cause a deadlock during stop with 
savepoint (FLINK-17170).
      *
      * <ol>
-     *   <li>The test setups up a stream with 100 records and creates a Flink 
job that reads them
+     *   <li>The test setups up a stream with 1000 records and creates a Flink 
job that reads them
      *       with very slowly (using up a large chunk of time of the mailbox).
      *   <li>After ensuring that consumption has started, the job is stopped 
in a parallel thread.
      *   <li>Without the fix of FLINK-17170, the job now has a high chance to 
deadlock during
      *       cancel.
      *   <li>With the fix, the job proceeds and we can lift the backpressure.
      * </ol>
      */
-    @Test
-    public void testStopWithSavepoint() throws Exception {
-        client.createTopic(TEST_STREAM, 1, new Properties());
-
+    private void testStopWithSavepoint(boolean drain, boolean efo) throws 
Exception {
         // add elements to the test stream
         int numElements = 1000;
         client.sendMessage(
-                TEST_STREAM,
+                stream,
                 IntStream.range(0, 
numElements).mapToObj(String::valueOf).toArray(String[]::new));
 
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(1);
+        env.enableCheckpointing(100L);
 
-        Properties config = kinesalite.getContainerProperties();
-        config.setProperty(STREAM_INITIAL_POSITION, 
InitialPosition.TRIM_HORIZON.name());
-        FlinkKinesisConsumer<String> consumer =
-                new FlinkKinesisConsumer<>(TEST_STREAM, STRING_SCHEMA, config);
+        SharedReference<CountDownLatch> savepointTrigger = 
sharedObjects.add(new CountDownLatch(1));
+        DataStream<String> outputStream =
+                env.addSource(createKinesisConsumer(efo)).map(new 
WaitingMapper(savepointTrigger));
 
-        DataStream<String> stream = env.addSource(consumer).map(new 
WaitingMapper());
         // call stop with savepoint in another thread
         ForkJoinTask<Object> stopTask =
                 ForkJoinPool.commonPool()
                         .submit(
                                 () -> {
-                                    WaitingMapper.firstElement.await();
-                                    stopWithSavepoint();
-                                    WaitingMapper.stopped = true;
+                                    savepointTrigger.get().await();
+                                    stopWithSavepoint(drain);
                                     return null;
                                 });
         try {
-            List<String> result = stream.executeAndCollect(10000);
+            List<String> result = outputStream.executeAndCollect(10000);
             // stop with savepoint will most likely only return a small subset 
of the elements
             // validate that the prefix is as expected
-            assertThat(result, hasSize(lessThan(numElements)));
-            assertThat(
-                    result,
-                    equalTo(
-                            IntStream.range(0, numElements)
-                                    .mapToObj(String::valueOf)
-                                    .collect(Collectors.toList())
-                                    .subList(0, result.size())));
+            if (drain) {
+                assertThat(
+                        result,
+                        contains(
+                                IntStream.range(0, numElements)
+                                        .mapToObj(String::valueOf)
+                                        .toArray()));
+            } else {
+                // stop with savepoint will most likely only return a small 
subset of the elements
+                // validate that the prefix is as expected
+                assertThat(
+                        result,
+                        contains(
+                                IntStream.range(0, result.size())
+                                        .mapToObj(String::valueOf)
+                                        .toArray()));
+            }
         } finally {
-            stopTask.cancel(true);
+            stopTask.get();
+        }
+    }
+
+    private FlinkKinesisConsumer<String> createKinesisConsumer(boolean efo) {
+        Properties config = getContainerProperties();
+        config.setProperty(STREAM_INITIAL_POSITION, 
InitialPosition.TRIM_HORIZON.name());
+        if (efo) {
+            config.putIfAbsent(RECORD_PUBLISHER_TYPE, "EFO");
+            config.putIfAbsent(EFO_CONSUMER_NAME, "efo-flink-app");
         }
+        return new FlinkKinesisConsumer<>(stream, STRING_SCHEMA, config);
+    }
+
+    private Properties getContainerProperties() {
+        return kinesalite.getContainerProperties();
     }
 
-    private String stopWithSavepoint() throws Exception {
+    private String stopWithSavepoint(boolean drain) throws Exception {
         JobStatusMessage job =
                 
MINI_CLUSTER.getClusterClient().listJobs().get().stream().findFirst().get();
         return MINI_CLUSTER
                 .getClusterClient()
                 .stopWithSavepoint(
                         job.getJobId(),
-                        true,
+                        drain,
                         temp.getRoot().getAbsolutePath(),
                         SavepointFormatType.CANONICAL)
                 .get();
     }
 
-    private static class WaitingMapper implements MapFunction<String, String> {
-        static CountDownLatch firstElement;
-        static volatile boolean stopped;
+    private static class WaitingMapper
+            implements MapFunction<String, String>, CheckpointedFunction {
+        private final SharedReference<CountDownLatch> savepointTrigger;
+        private volatile boolean savepointTriggered;
+        // keep track on when the last checkpoint occurred
+        private transient Deadline checkpointDeadline;
+        private final AtomicInteger numElements = new AtomicInteger();
+
+        WaitingMapper(SharedReference<CountDownLatch> savepointTrigger) {
+            this.savepointTrigger = savepointTrigger;
+            checkpointDeadline = Deadline.fromNow(Duration.ofDays(1));
+        }
 
-        WaitingMapper() {
-            firstElement = new CountDownLatch(1);
-            stopped = false;
+        private void readObject(ObjectInputStream stream)
+                throws ClassNotFoundException, IOException {
+            stream.defaultReadObject();
+            checkpointDeadline = Deadline.fromNow(Duration.ofDays(1));
         }
 
         @Override
         public String map(String value) throws Exception {
-            if (firstElement.getCount() > 0) {
-                firstElement.countDown();
-            }
-            if (!stopped) {
-                Thread.sleep(100);
+            numElements.incrementAndGet();
+            if (!savepointTriggered) {
+                try {
+                    Thread.sleep(100);
+                } catch (InterruptedException e) {

Review Comment:
   The primary function of this mapper is to slow down data ingestion until 
savepoint is triggered. 
   
   We could mark the operator thread as interrupted, but I don't think this 
would have any broader impact.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to