Changeset: b75464874130 for monetdb-java
URL: https://dev.monetdb.org/hg/monetdb-java/rev/b75464874130
Modified Files:
        src/main/java/org/monetdb/mcl/net/MapiSocket.java
        tests/OnClientTester.java
Branch: onclient
Log Message:

Keep better track of whether the server has cancelled the upload

IOExceptions are suppressed by PrintStreams print/println methods.
This means the client may not realize the server cancelled
and we must suppress  all further attempts to write.

Also, the end of upload handshake is different if a cancellation occurred.


diffs (188 lines):

diff --git a/src/main/java/org/monetdb/mcl/net/MapiSocket.java 
b/src/main/java/org/monetdb/mcl/net/MapiSocket.java
--- a/src/main/java/org/monetdb/mcl/net/MapiSocket.java
+++ b/src/main/java/org/monetdb/mcl/net/MapiSocket.java
@@ -1194,6 +1194,7 @@ public class MapiSocket { /* cannot (yet
                public final static int DEFAULT_CHUNK_SIZE = 1024 * 1024;
                private final int chunkSize;
                private boolean closed = false;
+               private boolean serverCancelled = false;
                private int chunkLeft;
                private byte[] promptBuffer;
 
@@ -1215,6 +1216,12 @@ public class MapiSocket {        /* cannot (yet
 
                @Override
                public void write(int b) throws IOException {
+                       if (serverCancelled) {
+                               // We have already thrown an exception and 
apparently that has been ignored.
+                               // Probably because they're calling print 
methods instead of write.
+                               // Throw another one, maybe they'll catch this 
one.
+                               throw new IOException("Server aborted the 
upload");
+                       }
                        handleChunking();
                        super.write(b);
                        wrote(1);
@@ -1227,6 +1234,12 @@ public class MapiSocket {        /* cannot (yet
 
                @Override
                public void write(byte[] b, int off, int len) throws 
IOException {
+                       if (serverCancelled) {
+                               // We have already thrown an exception and 
apparently that has been ignored.
+                               // Probably because they're calling print 
methods instead of write.
+                               // Throw another one, maybe they'll catch this 
one.
+                               throw new IOException("Server aborted the 
upload");
+                       }
                        while (len > 0) {
                                handleChunking();
                                int toWrite = Integer.min(len, chunkLeft);
@@ -1247,6 +1260,15 @@ public class MapiSocket {        /* cannot (yet
                        if (closed) {
                                return;
                        }
+                       closed = true;
+
+                       if (serverCancelled)
+                               closeAfterServerCancelled();
+                       else
+                               closeAfterSuccesfulUpload();
+               }
+
+               private void closeAfterSuccesfulUpload() throws IOException {
                        if (chunkLeft != chunkSize) {
                                // flush pending data
                                flushAndReadPrompt();
@@ -1259,6 +1281,10 @@ public class MapiSocket {        /* cannot (yet
                        }
                }
 
+               private void closeAfterServerCancelled() throws IOException {
+                       // nothing to do here, we have already read the error 
prompt.
+               }
+
                private void wrote(int i) {
                        chunkLeft -= i;
                }
@@ -1278,6 +1304,9 @@ public class MapiSocket { /* cannot (yet
                                case MORE:
                                        return;
                                case FILETRANSFER:
+                                       // Note, if the caller is calling print 
methods instead of write, the IO exception gets hidden.
+                                       // This is unfortunate but there's 
nothing we can do about it.
+                                       serverCancelled = true;
                                        throw new IOException("Server aborted 
the upload");
                                default:
                                        throw new IOException("Expected 
MORE/DONE from server, got " + lineType);
diff --git a/tests/OnClientTester.java b/tests/OnClientTester.java
--- a/tests/OnClientTester.java
+++ b/tests/OnClientTester.java
@@ -49,46 +49,58 @@ public final class OnClientTester extend
 
        public void test_Upload() throws Exception {
                prepare();
-               conn.setUploadHandler(new MyUploadHandler(100));
+               MyUploadHandler handler = new MyUploadHandler(100);
+               conn.setUploadHandler(handler);
                update("COPY INTO foo FROM 'banana' ON CLIENT", 100);
+               assertEq("handler encountered write error", false, 
handler.encounteredWriteError());
                queryInt("SELECT COUNT(*) FROM foo", 100);
        }
 
        public void test_ClientRefusesUpload() throws Exception {
                prepare();
-               conn.setUploadHandler(new MyUploadHandler("immediate error"));
+               MyUploadHandler handler = new MyUploadHandler("immediate 
error");
+               conn.setUploadHandler(handler);
                expectError("COPY INTO foo FROM 'banana' ON CLIENT", "immediate 
error");
+               assertEq("handler encountered write error", false, 
handler.encounteredWriteError());
                queryInt("SELECT COUNT(*) FROM foo", 0);
        }
 
        public void test_Offset0() throws SQLException, Failure {
                prepare();
-               conn.setUploadHandler(new MyUploadHandler(100));
+               MyUploadHandler handler = new MyUploadHandler(100);
+               conn.setUploadHandler(handler);
                update("COPY OFFSET 0 INTO foo FROM 'banana' ON CLIENT", 100);
+               assertEq("handler encountered write error", false, 
handler.encounteredWriteError());
                queryInt("SELECT MIN(i) FROM foo", 1);
                queryInt("SELECT MAX(i) FROM foo", 100);
        }
 
        public void test_Offset1() throws SQLException, Failure {
                prepare();
-               conn.setUploadHandler(new MyUploadHandler(100));
+               MyUploadHandler handler = new MyUploadHandler(100);
+               conn.setUploadHandler(handler);
                update("COPY OFFSET 1 INTO foo FROM 'banana' ON CLIENT", 100);
+               assertEq("handler encountered write error", false, 
handler.encounteredWriteError());
                queryInt("SELECT MIN(i) FROM foo", 1);
                queryInt("SELECT MAX(i) FROM foo", 100);
        }
 
        public void test_Offset5() throws SQLException, Failure {
                prepare();
-               conn.setUploadHandler(new MyUploadHandler(100));
+               MyUploadHandler handler = new MyUploadHandler(100);
+               conn.setUploadHandler(handler);
                update("COPY OFFSET 5 INTO foo FROM 'banana' ON CLIENT", 96);
+               assertEq("handler encountered write error", false, 
handler.encounteredWriteError());
                queryInt("SELECT MIN(i) FROM foo", 5);
                queryInt("SELECT MAX(i) FROM foo", 100);
        }
 
        public void test_ServerStopsReading() throws SQLException, Failure {
                prepare();
-               conn.setUploadHandler(new MyUploadHandler(100));
-               update("COPY 10 RECORDS INTO foo FROM 'banana' ON CLIENT", 96);
+               MyUploadHandler handler = new MyUploadHandler(100);
+               conn.setUploadHandler(handler);
+               update("COPY 10 RECORDS INTO foo FROM 'banana' ON CLIENT", 10);
+               assertEq("handler encountered write error", true, 
handler.encounteredWriteError());
                // Server stopped reading after 10 rows. Will we stay in sync?
                queryInt("SELECT COUNT(i) FROM foo", 10);
        }
@@ -125,6 +137,7 @@ public final class OnClientTester extend
                conn.setUploadHandler(handler);
                handler.setChunkSize(1024 * 1024);
                update("COPY INTO foo FROM 'banana' ON CLIENT", n);
+               assertEq("handler encountered write error", false, 
handler.encounteredWriteError());
                queryInt("SELECT COUNT(DISTINCT i) FROM foo", n);
        }
 
@@ -183,8 +196,10 @@ public final class OnClientTester extend
 
        public void test_FailUploadLate() throws SQLException, Failure {
                prepare();
-               conn.setUploadHandler(new MyUploadHandler(100, 50, "i don't 
like line 50"));
+               MyUploadHandler handler = new MyUploadHandler(100, 50, "i don't 
like line 50");
+               conn.setUploadHandler(handler);
                expectError("COPY INTO foo FROM 'banana' ON CLIENT", "i don't 
like");
+               assertEq("handler encountered write error", false, 
handler.encounteredWriteError());
                assertEq("connection is closed", true, conn.isClosed());
        }
 
@@ -202,6 +217,7 @@ public final class OnClientTester extend
                private final int rows;
                private final int errorAt;
                private final String errorMessage;
+               private boolean encounteredWriteError;
 
                private int chunkSize = 100; // small number to trigger more 
bugs
 
@@ -237,9 +253,16 @@ public final class OnClientTester extend
                                        throw new IOException(errorMessage);
                                }
                                stream.printf("%d|%d%n", i + 1, i + 1);
+                               if (i % 25 == 0 && stream.checkError()) {
+                                       encounteredWriteError = true;
+                                       break;
+                               }
                        }
                }
 
+               public Object encounteredWriteError() {
+                       return encounteredWriteError;
+               }
        }
 
        static class MyDownloadHandler implements MonetDownloadHandler {
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to