Changeset: ce2b616ed22e for monetdb-java
URL: https://dev.monetdb.org/hg/monetdb-java/rev/ce2b616ed22e
Modified Files:
src/main/java/org/monetdb/jdbc/MonetConnection.java
src/main/java/org/monetdb/mcl/net/MapiSocket.java
tests/OnClientTester.java
Branch: onclient
Log Message:
Add a cancellation callback to UploadHandler
diffs (191 lines):
diff --git a/src/main/java/org/monetdb/jdbc/MonetConnection.java
b/src/main/java/org/monetdb/jdbc/MonetConnection.java
--- a/src/main/java/org/monetdb/jdbc/MonetConnection.java
+++ b/src/main/java/org/monetdb/jdbc/MonetConnection.java
@@ -3236,7 +3236,7 @@ public class MonetConnection
}
long linesToSkip = offset >= 1 ? offset - 1 : 0;
- Upload handle = new Upload(server);
+ Upload handle = new Upload(server,
uploadHandler::uploadCancelled);
boolean wasFaking = server.setInsertFakePrompts(false);
try {
uploadHandler.handleUpload(handle, path, textMode,
linesToSkip);
@@ -3290,6 +3290,13 @@ public class MonetConnection
* where both 0 and 1 mean 'upload
everything'
*/
void handleUpload(Upload handle, String name, boolean textMode,
long linesToSkip) throws IOException;
+
+ /**
+ * Called when the upload is cancelled halfway by the server.
+ *
+ * The default implementation does nothing.
+ */
+ default void uploadCancelled() {}
}
/**
@@ -3316,12 +3323,14 @@ public class MonetConnection
*/
public static class Upload {
private final MapiSocket server;
+ private final Runnable cancellationCallback;
private PrintStream print = null;
private String error = null;
private int customChunkSize = -1;
- Upload(MapiSocket server) {
+ Upload(MapiSocket server, Runnable cancellationCallback) {
this.server = server;
+ this.cancellationCallback = cancellationCallback;
}
/**
@@ -3362,6 +3371,7 @@ public class MonetConnection
if (print == null) {
try {
MapiSocket.UploadStream up =
customChunkSize >= 0 ? server.uploadStream(customChunkSize) :
server.uploadStream();
+
up.setCancellationCallback(cancellationCallback);
print = new PrintStream(up, false,
"UTF-8");
up.write('\n');
} catch (UnsupportedEncodingException e) {
diff --git a/src/main/java/org/monetdb/mcl/net/MapiSocket.java
b/src/main/java/org/monetdb/mcl/net/MapiSocket.java
--- a/src/main/java/org/monetdb/mcl/net/MapiSocket.java
+++ b/src/main/java/org/monetdb/mcl/net/MapiSocket.java
@@ -1233,6 +1233,7 @@ public class MapiSocket { /* cannot (yet
private boolean serverCancelled = false;
private int chunkLeft;
private byte[] promptBuffer;
+ private Runnable cancellationCallback = null;
/** Create an UploadStream with the given chunk size */
UploadStream(int chunkSize) {
@@ -1252,6 +1253,12 @@ public class MapiSocket { /* cannot (yet
this(DEFAULT_CHUNK_SIZE);
}
+ /** Set a callback to be invoked if the server cancels the
upload
+ */
+ public void setCancellationCallback(Runnable
cancellationCallback) {
+ this.cancellationCallback = cancellationCallback;
+ }
+
@Override
public void write(int b) throws IOException {
if (serverCancelled) {
@@ -1345,6 +1352,9 @@ public class MapiSocket { /* cannot (yet
// Note, if the caller is calling print
methods instead of write, the IO exception gets hidden.
// This is unfortunate but there's
nothing we can do about it.
serverCancelled = true;
+ if (cancellationCallback != null) {
+ cancellationCallback.run();
+ }
throw new IOException("Server aborted
the upload");
default:
throw new IOException("Expected
MORE/DONE from server, got " + lineType);
diff --git a/tests/OnClientTester.java b/tests/OnClientTester.java
--- a/tests/OnClientTester.java
+++ b/tests/OnClientTester.java
@@ -63,7 +63,7 @@ public final class OnClientTester extend
MyUploadHandler handler = new MyUploadHandler(100);
conn.setUploadHandler(handler);
update("COPY INTO foo FROM 'banana' ON CLIENT", 100);
- assertEq("handler encountered write error", false,
handler.encounteredWriteError());
+ assertEq("cancellation callback called", false,
handler.isCancelled());
queryInt("SELECT COUNT(*) FROM foo", 100);
}
@@ -72,7 +72,7 @@ public final class OnClientTester extend
MyUploadHandler handler = new MyUploadHandler("immediate
error");
conn.setUploadHandler(handler);
expectError("COPY INTO foo FROM 'banana' ON CLIENT", "immediate
error");
- assertEq("handler encountered write error", false,
handler.encounteredWriteError());
+ assertEq("cancellation callback called", false,
handler.isCancelled());
queryInt("SELECT COUNT(*) FROM foo", 0);
}
@@ -81,7 +81,7 @@ public final class OnClientTester extend
MyUploadHandler handler = new MyUploadHandler(100);
conn.setUploadHandler(handler);
update("COPY OFFSET 0 INTO foo FROM 'banana' ON CLIENT", 100);
- assertEq("handler encountered write error", false,
handler.encounteredWriteError());
+ assertEq("cancellation callback called", false,
handler.isCancelled());
queryInt("SELECT MIN(i) FROM foo", 1);
queryInt("SELECT MAX(i) FROM foo", 100);
}
@@ -91,7 +91,7 @@ public final class OnClientTester extend
MyUploadHandler handler = new MyUploadHandler(100);
conn.setUploadHandler(handler);
update("COPY OFFSET 1 INTO foo FROM 'banana' ON CLIENT", 100);
- assertEq("handler encountered write error", false,
handler.encounteredWriteError());
+ assertEq("cancellation callback called", false,
handler.isCancelled());
queryInt("SELECT MIN(i) FROM foo", 1);
queryInt("SELECT MAX(i) FROM foo", 100);
}
@@ -101,7 +101,7 @@ public final class OnClientTester extend
MyUploadHandler handler = new MyUploadHandler(100);
conn.setUploadHandler(handler);
update("COPY OFFSET 5 INTO foo FROM 'banana' ON CLIENT", 96);
- assertEq("handler encountered write error", false,
handler.encounteredWriteError());
+ assertEq("cancellation callback called", false,
handler.isCancelled());
queryInt("SELECT MIN(i) FROM foo", 5);
queryInt("SELECT MAX(i) FROM foo", 100);
}
@@ -111,6 +111,7 @@ public final class OnClientTester extend
MyUploadHandler handler = new MyUploadHandler(100);
conn.setUploadHandler(handler);
update("COPY 10 RECORDS INTO foo FROM 'banana' ON CLIENT", 10);
+ assertEq("cancellation callback called", true,
handler.isCancelled());
assertEq("handler encountered write error", true,
handler.encounteredWriteError());
// Server stopped reading after 10 rows. Will we stay in sync?
queryInt("SELECT COUNT(i) FROM foo", 10);
@@ -149,7 +150,7 @@ public final class OnClientTester extend
conn.setUploadHandler(handler);
handler.setChunkSize(1024 * 1024);
update("COPY INTO foo FROM 'banana' ON CLIENT", n);
- assertEq("handler encountered write error", false,
handler.encounteredWriteError());
+ assertEq("cancellation callback called", false,
handler.isCancelled());
queryInt("SELECT COUNT(DISTINCT i) FROM foo", n);
}
@@ -213,7 +214,7 @@ public final class OnClientTester extend
MyUploadHandler handler = new MyUploadHandler(100, 50, "i don't
like line 50");
conn.setUploadHandler(handler);
expectError("COPY INTO foo FROM 'banana' ON CLIENT", "i don't
like");
- assertEq("handler encountered write error", false,
handler.encounteredWriteError());
+ assertEq("cancellation callback called", false,
handler.isCancelled());
assertEq("connection is closed", true, conn.isClosed());
}
@@ -254,7 +255,8 @@ public final class OnClientTester extend
private final long rows;
private final long errorAt;
private final String errorMessage;
- private boolean encounteredWriteError;
+ private boolean encounteredWriteError = false;
+ private boolean cancelled = false;
private int chunkSize = 100; // small number to trigger more
bugs
@@ -277,6 +279,11 @@ public final class OnClientTester extend
}
@Override
+ public void uploadCancelled() {
+ cancelled = true;
+ }
+
+ @Override
public void handleUpload(MonetConnection.Upload handle, String
name, boolean textMode, long linesToSkip) throws IOException {
if (errorAt == -1 && errorMessage != null) {
handle.sendError(errorMessage);
@@ -299,6 +306,10 @@ public final class OnClientTester extend
public Object encounteredWriteError() {
return encounteredWriteError;
}
+
+ public boolean isCancelled() {
+ return cancelled;
+ }
}
static class MyDownloadHandler implements DownloadHandler {
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list