RocMarshal commented on code in PR #200:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/200#discussion_r3460184017


##########
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/JdbcSourceSplitReaderTest.java:
##########
@@ -144,4 +146,120 @@ void testFetch() throws Exception {
         assertThat(fetchedRecordsWithSplitIds.nextSplit()).isNull();
         splitReader.close();
     }
+
+    @Test
+    void testFetchReconnectsWhenConnectionClosedWhileOpeningSplit() throws 
Exception {
+        // The provider hands back an already-closed connection the first 
time, so the reader's
+        // first use of it fails (mimicking the connection being torn down, 
e.g. on source
+        // cancellation, before the reader finishes opening the split). The 
reader must
+        // re-establish the connection and read the whole split.
+        CountingConnectionProvider provider = new 
CountingConnectionProvider(connectionProvider, 1);
+        JdbcSourceSplitReader<TestEntry> splitReader = newReader(provider, 
split);
+        try {

Review Comment:
   Could the lines be 
   ```
   try(JdbcSourceSplitReader<TestEntry> splitReader = newReader(provider, 
split)) {
   ....
   }
   
   // deleted
   // xxx.close()
   
   ```



##########
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/JdbcSourceSplitReaderTest.java:
##########
@@ -144,4 +146,120 @@ void testFetch() throws Exception {
         assertThat(fetchedRecordsWithSplitIds.nextSplit()).isNull();
         splitReader.close();
     }
+
+    @Test
+    void testFetchReconnectsWhenConnectionClosedWhileOpeningSplit() throws 
Exception {
+        // The provider hands back an already-closed connection the first 
time, so the reader's
+        // first use of it fails (mimicking the connection being torn down, 
e.g. on source
+        // cancellation, before the reader finishes opening the split). The 
reader must
+        // re-establish the connection and read the whole split.
+        CountingConnectionProvider provider = new 
CountingConnectionProvider(connectionProvider, 1);
+        JdbcSourceSplitReader<TestEntry> splitReader = newReader(provider, 
split);
+        try {
+            RecordsWithSplitIds<RecordAndOffset<TestEntry>> fetched = 
splitReader.fetch();
+            assertThat(fetched.nextSplit()).isEqualTo("1");
+            List<TestEntry> records = new ArrayList<>();
+            RecordAndOffset<TestEntry> recordAndOffset = 
fetched.nextRecordFromSplit();
+            while (recordAndOffset != null) {
+                records.add(recordAndOffset.record);
+                recordAndOffset = fetched.nextRecordFromSplit();
+            }
+            assertThat(records).hasSize(TEST_DATA.length);
+            // The first (closed) connection plus exactly one re-established 
one: a reconnect ran.
+            assertThat(provider.establishCount).isEqualTo(2);
+        } finally {
+            splitReader.close();
+        }
+    }
+
+    @Test
+    void testFetchRethrowsImmediatelyWhenConnectionStaysOpen() throws 
Exception {
+        // A query error on a healthy (open) connection must be rethrown 
immediately, with no
+        // reconnect attempt.
+        CountingConnectionProvider provider = new 
CountingConnectionProvider(connectionProvider, 0);
+        JdbcSourceSplit invalidSplit =
+                new JdbcSourceSplit("1", "select * from NON_EXISTENT_TABLE", 
null, null);
+        JdbcSourceSplitReader<TestEntry> splitReader = newReader(provider, 
invalidSplit);
+        try {

Review Comment:
   if so, ditto



##########
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/JdbcSourceSplitReaderTest.java:
##########
@@ -144,4 +146,120 @@ void testFetch() throws Exception {
         assertThat(fetchedRecordsWithSplitIds.nextSplit()).isNull();
         splitReader.close();
     }
+
+    @Test
+    void testFetchReconnectsWhenConnectionClosedWhileOpeningSplit() throws 
Exception {
+        // The provider hands back an already-closed connection the first 
time, so the reader's
+        // first use of it fails (mimicking the connection being torn down, 
e.g. on source
+        // cancellation, before the reader finishes opening the split). The 
reader must
+        // re-establish the connection and read the whole split.
+        CountingConnectionProvider provider = new 
CountingConnectionProvider(connectionProvider, 1);
+        JdbcSourceSplitReader<TestEntry> splitReader = newReader(provider, 
split);
+        try {
+            RecordsWithSplitIds<RecordAndOffset<TestEntry>> fetched = 
splitReader.fetch();
+            assertThat(fetched.nextSplit()).isEqualTo("1");
+            List<TestEntry> records = new ArrayList<>();
+            RecordAndOffset<TestEntry> recordAndOffset = 
fetched.nextRecordFromSplit();
+            while (recordAndOffset != null) {
+                records.add(recordAndOffset.record);
+                recordAndOffset = fetched.nextRecordFromSplit();
+            }
+            assertThat(records).hasSize(TEST_DATA.length);
+            // The first (closed) connection plus exactly one re-established 
one: a reconnect ran.
+            assertThat(provider.establishCount).isEqualTo(2);
+        } finally {
+            splitReader.close();
+        }
+    }
+
+    @Test
+    void testFetchRethrowsImmediatelyWhenConnectionStaysOpen() throws 
Exception {
+        // A query error on a healthy (open) connection must be rethrown 
immediately, with no
+        // reconnect attempt.
+        CountingConnectionProvider provider = new 
CountingConnectionProvider(connectionProvider, 0);
+        JdbcSourceSplit invalidSplit =
+                new JdbcSourceSplit("1", "select * from NON_EXISTENT_TABLE", 
null, null);
+        JdbcSourceSplitReader<TestEntry> splitReader = newReader(provider, 
invalidSplit);
+        try {
+            
assertThatThrownBy(splitReader::fetch).isInstanceOf(RuntimeException.class);
+            assertThat(provider.establishCount).isEqualTo(1);
+        } finally {
+            splitReader.close();
+        }
+    }
+
+    @Test
+    void testFetchFailsAfterExhaustingReconnectRetries() throws Exception {
+        // If the connection keeps being closed, the reader gives up after the 
retry budget
+        // instead of looping forever.
+        CountingConnectionProvider provider =
+                new CountingConnectionProvider(connectionProvider, 
Integer.MAX_VALUE);
+        JdbcSourceSplitReader<TestEntry> splitReader = newReader(provider, 
split);
+        try {

Review Comment:
   if so, ditto



##########
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/JdbcSourceSplitReaderTest.java:
##########
@@ -144,4 +146,120 @@ void testFetch() throws Exception {
         assertThat(fetchedRecordsWithSplitIds.nextSplit()).isNull();
         splitReader.close();
     }
+
+    @Test
+    void testFetchReconnectsWhenConnectionClosedWhileOpeningSplit() throws 
Exception {
+        // The provider hands back an already-closed connection the first 
time, so the reader's
+        // first use of it fails (mimicking the connection being torn down, 
e.g. on source
+        // cancellation, before the reader finishes opening the split). The 
reader must
+        // re-establish the connection and read the whole split.
+        CountingConnectionProvider provider = new 
CountingConnectionProvider(connectionProvider, 1);
+        JdbcSourceSplitReader<TestEntry> splitReader = newReader(provider, 
split);
+        try {
+            RecordsWithSplitIds<RecordAndOffset<TestEntry>> fetched = 
splitReader.fetch();
+            assertThat(fetched.nextSplit()).isEqualTo("1");
+            List<TestEntry> records = new ArrayList<>();
+            RecordAndOffset<TestEntry> recordAndOffset = 
fetched.nextRecordFromSplit();
+            while (recordAndOffset != null) {
+                records.add(recordAndOffset.record);
+                recordAndOffset = fetched.nextRecordFromSplit();
+            }
+            assertThat(records).hasSize(TEST_DATA.length);
+            // The first (closed) connection plus exactly one re-established 
one: a reconnect ran.
+            assertThat(provider.establishCount).isEqualTo(2);
+        } finally {
+            splitReader.close();

Review Comment:
   if so, deleted



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to