Changeset: b75464874130 for monetdb-java
URL: https://dev.monetdb.org/hg/monetdb-java/rev/b75464874130
Modified Files:
src/main/java/org/monetdb/mcl/net/MapiSocket.java
tests/OnClientTester.java
Branch: onclient
Log Message:
Keep better track of whether the server has cancelled the upload
IOExceptions are suppressed by PrintStreams print/println methods.
This means the client may not realize the server cancelled
and we must suppress all further attempts to write.
Also, the end of upload handshake is different if a cancellation occurred.
diffs (188 lines):
diff --git a/src/main/java/org/monetdb/mcl/net/MapiSocket.java
b/src/main/java/org/monetdb/mcl/net/MapiSocket.java
--- a/src/main/java/org/monetdb/mcl/net/MapiSocket.java
+++ b/src/main/java/org/monetdb/mcl/net/MapiSocket.java
@@ -1194,6 +1194,7 @@ public class MapiSocket { /* cannot (yet
public final static int DEFAULT_CHUNK_SIZE = 1024 * 1024;
private final int chunkSize;
private boolean closed = false;
+ private boolean serverCancelled = false;
private int chunkLeft;
private byte[] promptBuffer;
@@ -1215,6 +1216,12 @@ public class MapiSocket { /* cannot (yet
@Override
public void write(int b) throws IOException {
+ if (serverCancelled) {
+ // We have already thrown an exception and
apparently that has been ignored.
+ // Probably because they're calling print
methods instead of write.
+ // Throw another one, maybe they'll catch this
one.
+ throw new IOException("Server aborted the
upload");
+ }
handleChunking();
super.write(b);
wrote(1);
@@ -1227,6 +1234,12 @@ public class MapiSocket { /* cannot (yet
@Override
public void write(byte[] b, int off, int len) throws
IOException {
+ if (serverCancelled) {
+ // We have already thrown an exception and
apparently that has been ignored.
+ // Probably because they're calling print
methods instead of write.
+ // Throw another one, maybe they'll catch this
one.
+ throw new IOException("Server aborted the
upload");
+ }
while (len > 0) {
handleChunking();
int toWrite = Integer.min(len, chunkLeft);
@@ -1247,6 +1260,15 @@ public class MapiSocket { /* cannot (yet
if (closed) {
return;
}
+ closed = true;
+
+ if (serverCancelled)
+ closeAfterServerCancelled();
+ else
+ closeAfterSuccesfulUpload();
+ }
+
+ private void closeAfterSuccesfulUpload() throws IOException {
if (chunkLeft != chunkSize) {
// flush pending data
flushAndReadPrompt();
@@ -1259,6 +1281,10 @@ public class MapiSocket { /* cannot (yet
}
}
+ private void closeAfterServerCancelled() throws IOException {
+ // nothing to do here, we have already read the error
prompt.
+ }
+
private void wrote(int i) {
chunkLeft -= i;
}
@@ -1278,6 +1304,9 @@ public class MapiSocket { /* cannot (yet
case MORE:
return;
case FILETRANSFER:
+ // Note, if the caller is calling print
methods instead of write, the IO exception gets hidden.
+ // This is unfortunate but there's
nothing we can do about it.
+ serverCancelled = true;
throw new IOException("Server aborted
the upload");
default:
throw new IOException("Expected
MORE/DONE from server, got " + lineType);
diff --git a/tests/OnClientTester.java b/tests/OnClientTester.java
--- a/tests/OnClientTester.java
+++ b/tests/OnClientTester.java
@@ -49,46 +49,58 @@ public final class OnClientTester extend
public void test_Upload() throws Exception {
prepare();
- conn.setUploadHandler(new MyUploadHandler(100));
+ MyUploadHandler handler = new MyUploadHandler(100);
+ conn.setUploadHandler(handler);
update("COPY INTO foo FROM 'banana' ON CLIENT", 100);
+ assertEq("handler encountered write error", false,
handler.encounteredWriteError());
queryInt("SELECT COUNT(*) FROM foo", 100);
}
public void test_ClientRefusesUpload() throws Exception {
prepare();
- conn.setUploadHandler(new MyUploadHandler("immediate error"));
+ MyUploadHandler handler = new MyUploadHandler("immediate
error");
+ conn.setUploadHandler(handler);
expectError("COPY INTO foo FROM 'banana' ON CLIENT", "immediate
error");
+ assertEq("handler encountered write error", false,
handler.encounteredWriteError());
queryInt("SELECT COUNT(*) FROM foo", 0);
}
public void test_Offset0() throws SQLException, Failure {
prepare();
- conn.setUploadHandler(new MyUploadHandler(100));
+ MyUploadHandler handler = new MyUploadHandler(100);
+ conn.setUploadHandler(handler);
update("COPY OFFSET 0 INTO foo FROM 'banana' ON CLIENT", 100);
+ assertEq("handler encountered write error", false,
handler.encounteredWriteError());
queryInt("SELECT MIN(i) FROM foo", 1);
queryInt("SELECT MAX(i) FROM foo", 100);
}
public void test_Offset1() throws SQLException, Failure {
prepare();
- conn.setUploadHandler(new MyUploadHandler(100));
+ MyUploadHandler handler = new MyUploadHandler(100);
+ conn.setUploadHandler(handler);
update("COPY OFFSET 1 INTO foo FROM 'banana' ON CLIENT", 100);
+ assertEq("handler encountered write error", false,
handler.encounteredWriteError());
queryInt("SELECT MIN(i) FROM foo", 1);
queryInt("SELECT MAX(i) FROM foo", 100);
}
public void test_Offset5() throws SQLException, Failure {
prepare();
- conn.setUploadHandler(new MyUploadHandler(100));
+ MyUploadHandler handler = new MyUploadHandler(100);
+ conn.setUploadHandler(handler);
update("COPY OFFSET 5 INTO foo FROM 'banana' ON CLIENT", 96);
+ assertEq("handler encountered write error", false,
handler.encounteredWriteError());
queryInt("SELECT MIN(i) FROM foo", 5);
queryInt("SELECT MAX(i) FROM foo", 100);
}
public void test_ServerStopsReading() throws SQLException, Failure {
prepare();
- conn.setUploadHandler(new MyUploadHandler(100));
- update("COPY 10 RECORDS INTO foo FROM 'banana' ON CLIENT", 96);
+ MyUploadHandler handler = new MyUploadHandler(100);
+ conn.setUploadHandler(handler);
+ update("COPY 10 RECORDS INTO foo FROM 'banana' ON CLIENT", 10);
+ assertEq("handler encountered write error", true,
handler.encounteredWriteError());
// Server stopped reading after 10 rows. Will we stay in sync?
queryInt("SELECT COUNT(i) FROM foo", 10);
}
@@ -125,6 +137,7 @@ public final class OnClientTester extend
conn.setUploadHandler(handler);
handler.setChunkSize(1024 * 1024);
update("COPY INTO foo FROM 'banana' ON CLIENT", n);
+ assertEq("handler encountered write error", false,
handler.encounteredWriteError());
queryInt("SELECT COUNT(DISTINCT i) FROM foo", n);
}
@@ -183,8 +196,10 @@ public final class OnClientTester extend
public void test_FailUploadLate() throws SQLException, Failure {
prepare();
- conn.setUploadHandler(new MyUploadHandler(100, 50, "i don't
like line 50"));
+ MyUploadHandler handler = new MyUploadHandler(100, 50, "i don't
like line 50");
+ conn.setUploadHandler(handler);
expectError("COPY INTO foo FROM 'banana' ON CLIENT", "i don't
like");
+ assertEq("handler encountered write error", false,
handler.encounteredWriteError());
assertEq("connection is closed", true, conn.isClosed());
}
@@ -202,6 +217,7 @@ public final class OnClientTester extend
private final int rows;
private final int errorAt;
private final String errorMessage;
+ private boolean encounteredWriteError;
private int chunkSize = 100; // small number to trigger more
bugs
@@ -237,9 +253,16 @@ public final class OnClientTester extend
throw new IOException(errorMessage);
}
stream.printf("%d|%d%n", i + 1, i + 1);
+ if (i % 25 == 0 && stream.checkError()) {
+ encounteredWriteError = true;
+ break;
+ }
}
}
+ public Object encounteredWriteError() {
+ return encounteredWriteError;
+ }
}
static class MyDownloadHandler implements MonetDownloadHandler {
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list