Changeset: ce2b616ed22e for monetdb-java
URL: https://dev.monetdb.org/hg/monetdb-java/rev/ce2b616ed22e
Modified Files:
        src/main/java/org/monetdb/jdbc/MonetConnection.java
        src/main/java/org/monetdb/mcl/net/MapiSocket.java
        tests/OnClientTester.java
Branch: onclient
Log Message:

Add a cancellation callback to UploadHandler


diffs (191 lines):

diff --git a/src/main/java/org/monetdb/jdbc/MonetConnection.java 
b/src/main/java/org/monetdb/jdbc/MonetConnection.java
--- a/src/main/java/org/monetdb/jdbc/MonetConnection.java
+++ b/src/main/java/org/monetdb/jdbc/MonetConnection.java
@@ -3236,7 +3236,7 @@ public class MonetConnection
                }
 
                long linesToSkip = offset >= 1 ? offset - 1 : 0;
-               Upload handle = new Upload(server);
+               Upload handle = new Upload(server, 
uploadHandler::uploadCancelled);
                boolean wasFaking = server.setInsertFakePrompts(false);
                try {
                        uploadHandler.handleUpload(handle, path, textMode, 
linesToSkip);
@@ -3290,6 +3290,13 @@ public class MonetConnection
                 *                    where both 0 and 1 mean 'upload 
everything'
                 */
                void handleUpload(Upload handle, String name, boolean textMode, 
long linesToSkip) throws IOException;
+
+               /**
+                * Called when the upload is cancelled halfway by the server.
+                *
+                * The default implementation does nothing.
+                */
+               default void uploadCancelled() {}
        }
 
        /**
@@ -3316,12 +3323,14 @@ public class MonetConnection
         */
        public static class Upload {
                private final MapiSocket server;
+               private final Runnable cancellationCallback;
                private PrintStream print = null;
                private String error = null;
                private int customChunkSize = -1;
 
-               Upload(MapiSocket server) {
+               Upload(MapiSocket server, Runnable cancellationCallback) {
                        this.server = server;
+                       this.cancellationCallback = cancellationCallback;
                }
 
                /**
@@ -3362,6 +3371,7 @@ public class MonetConnection
                        if (print == null) {
                                try {
                                        MapiSocket.UploadStream up = 
customChunkSize >= 0 ? server.uploadStream(customChunkSize) : 
server.uploadStream();
+                                       
up.setCancellationCallback(cancellationCallback);
                                        print = new PrintStream(up, false, 
"UTF-8");
                                        up.write('\n');
                                } catch (UnsupportedEncodingException e) {
diff --git a/src/main/java/org/monetdb/mcl/net/MapiSocket.java 
b/src/main/java/org/monetdb/mcl/net/MapiSocket.java
--- a/src/main/java/org/monetdb/mcl/net/MapiSocket.java
+++ b/src/main/java/org/monetdb/mcl/net/MapiSocket.java
@@ -1233,6 +1233,7 @@ public class MapiSocket { /* cannot (yet
                private boolean serverCancelled = false;
                private int chunkLeft;
                private byte[] promptBuffer;
+               private Runnable cancellationCallback = null;
 
                /** Create an UploadStream with the given chunk size */
                UploadStream(int chunkSize) {
@@ -1252,6 +1253,12 @@ public class MapiSocket {        /* cannot (yet
                        this(DEFAULT_CHUNK_SIZE);
                }
 
+               /** Set a callback to be invoked if the server cancels the 
upload
+                */
+               public void setCancellationCallback(Runnable 
cancellationCallback) {
+                       this.cancellationCallback = cancellationCallback;
+               }
+
                @Override
                public void write(int b) throws IOException {
                        if (serverCancelled) {
@@ -1345,6 +1352,9 @@ public class MapiSocket { /* cannot (yet
                                        // Note, if the caller is calling print 
methods instead of write, the IO exception gets hidden.
                                        // This is unfortunate but there's 
nothing we can do about it.
                                        serverCancelled = true;
+                                       if (cancellationCallback != null) {
+                                               cancellationCallback.run();
+                                       }
                                        throw new IOException("Server aborted 
the upload");
                                default:
                                        throw new IOException("Expected 
MORE/DONE from server, got " + lineType);
diff --git a/tests/OnClientTester.java b/tests/OnClientTester.java
--- a/tests/OnClientTester.java
+++ b/tests/OnClientTester.java
@@ -63,7 +63,7 @@ public final class OnClientTester extend
                MyUploadHandler handler = new MyUploadHandler(100);
                conn.setUploadHandler(handler);
                update("COPY INTO foo FROM 'banana' ON CLIENT", 100);
-               assertEq("handler encountered write error", false, 
handler.encounteredWriteError());
+               assertEq("cancellation callback called", false, 
handler.isCancelled());
                queryInt("SELECT COUNT(*) FROM foo", 100);
        }
 
@@ -72,7 +72,7 @@ public final class OnClientTester extend
                MyUploadHandler handler = new MyUploadHandler("immediate 
error");
                conn.setUploadHandler(handler);
                expectError("COPY INTO foo FROM 'banana' ON CLIENT", "immediate 
error");
-               assertEq("handler encountered write error", false, 
handler.encounteredWriteError());
+               assertEq("cancellation callback called", false, 
handler.isCancelled());
                queryInt("SELECT COUNT(*) FROM foo", 0);
        }
 
@@ -81,7 +81,7 @@ public final class OnClientTester extend
                MyUploadHandler handler = new MyUploadHandler(100);
                conn.setUploadHandler(handler);
                update("COPY OFFSET 0 INTO foo FROM 'banana' ON CLIENT", 100);
-               assertEq("handler encountered write error", false, 
handler.encounteredWriteError());
+               assertEq("cancellation callback called", false, 
handler.isCancelled());
                queryInt("SELECT MIN(i) FROM foo", 1);
                queryInt("SELECT MAX(i) FROM foo", 100);
        }
@@ -91,7 +91,7 @@ public final class OnClientTester extend
                MyUploadHandler handler = new MyUploadHandler(100);
                conn.setUploadHandler(handler);
                update("COPY OFFSET 1 INTO foo FROM 'banana' ON CLIENT", 100);
-               assertEq("handler encountered write error", false, 
handler.encounteredWriteError());
+               assertEq("cancellation callback called", false, 
handler.isCancelled());
                queryInt("SELECT MIN(i) FROM foo", 1);
                queryInt("SELECT MAX(i) FROM foo", 100);
        }
@@ -101,7 +101,7 @@ public final class OnClientTester extend
                MyUploadHandler handler = new MyUploadHandler(100);
                conn.setUploadHandler(handler);
                update("COPY OFFSET 5 INTO foo FROM 'banana' ON CLIENT", 96);
-               assertEq("handler encountered write error", false, 
handler.encounteredWriteError());
+               assertEq("cancellation callback called", false, 
handler.isCancelled());
                queryInt("SELECT MIN(i) FROM foo", 5);
                queryInt("SELECT MAX(i) FROM foo", 100);
        }
@@ -111,6 +111,7 @@ public final class OnClientTester extend
                MyUploadHandler handler = new MyUploadHandler(100);
                conn.setUploadHandler(handler);
                update("COPY 10 RECORDS INTO foo FROM 'banana' ON CLIENT", 10);
+               assertEq("cancellation callback called", true, 
handler.isCancelled());
                assertEq("handler encountered write error", true, 
handler.encounteredWriteError());
                // Server stopped reading after 10 rows. Will we stay in sync?
                queryInt("SELECT COUNT(i) FROM foo", 10);
@@ -149,7 +150,7 @@ public final class OnClientTester extend
                conn.setUploadHandler(handler);
                handler.setChunkSize(1024 * 1024);
                update("COPY INTO foo FROM 'banana' ON CLIENT", n);
-               assertEq("handler encountered write error", false, 
handler.encounteredWriteError());
+               assertEq("cancellation callback called", false, 
handler.isCancelled());
                queryInt("SELECT COUNT(DISTINCT i) FROM foo", n);
        }
 
@@ -213,7 +214,7 @@ public final class OnClientTester extend
                MyUploadHandler handler = new MyUploadHandler(100, 50, "i don't 
like line 50");
                conn.setUploadHandler(handler);
                expectError("COPY INTO foo FROM 'banana' ON CLIENT", "i don't 
like");
-               assertEq("handler encountered write error", false, 
handler.encounteredWriteError());
+               assertEq("cancellation callback called", false, 
handler.isCancelled());
                assertEq("connection is closed", true, conn.isClosed());
        }
 
@@ -254,7 +255,8 @@ public final class OnClientTester extend
                private final long rows;
                private final long errorAt;
                private final String errorMessage;
-               private boolean encounteredWriteError;
+               private boolean encounteredWriteError = false;
+               private boolean cancelled = false;
 
                private int chunkSize = 100; // small number to trigger more 
bugs
 
@@ -277,6 +279,11 @@ public final class OnClientTester extend
                }
 
                @Override
+               public void uploadCancelled() {
+                       cancelled = true;
+               }
+
+               @Override
                public void handleUpload(MonetConnection.Upload handle, String 
name, boolean textMode, long linesToSkip) throws IOException {
                        if (errorAt == -1 && errorMessage != null) {
                                handle.sendError(errorMessage);
@@ -299,6 +306,10 @@ public final class OnClientTester extend
                public Object encounteredWriteError() {
                        return encounteredWriteError;
                }
+
+               public boolean isCancelled() {
+                       return cancelled;
+               }
        }
 
        static class MyDownloadHandler implements DownloadHandler {
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to