Repository: calcite Updated Branches: refs/heads/master 82fd259c6 -> 322b97300
http://git-wip-us.apache.org/repos/asf/calcite/blob/322b9730/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java ---------------------------------------------------------------------- diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java index 89ed01e..c6888f4 100644 --- a/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java +++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java @@ -68,6 +68,8 @@ public interface Service { CloseConnectionResponse apply(CloseConnectionRequest request); ConnectionSyncResponse apply(ConnectionSyncRequest request); DatabasePropertyResponse apply(DatabasePropertyRequest request); + CommitResponse apply(CommitRequest request); + RollbackResponse apply(RollbackRequest request); /** * Sets server-level metadata for RPCs. This includes information that is static across all RPCs. @@ -108,7 +110,9 @@ public interface Service { name = "closeConnection"), @JsonSubTypes.Type(value = ConnectionSyncRequest.class, name = "connectionSync"), @JsonSubTypes.Type(value = DatabasePropertyRequest.class, name = "databaseProperties"), - @JsonSubTypes.Type(value = SyncResultsRequest.class, name = "syncResults") }) + @JsonSubTypes.Type(value = SyncResultsRequest.class, name = "syncResults"), + @JsonSubTypes.Type(value = CommitRequest.class, name = "commit"), + @JsonSubTypes.Type(value = RollbackRequest.class, name = "rollback") }) abstract class Request { abstract Response accept(Service service); abstract Request deserialize(Message genericMsg); @@ -136,7 +140,9 @@ public interface Service { @JsonSubTypes.Type(value = ExecuteResponse.class, name = "executeResults"), @JsonSubTypes.Type(value = ErrorResponse.class, name = "error"), @JsonSubTypes.Type(value = SyncResultsResponse.class, name = "syncResults"), - @JsonSubTypes.Type(value = RpcMetadataResponse.class, name = "rpcMetadata") }) + @JsonSubTypes.Type(value = RpcMetadataResponse.class, name = "rpcMetadata"), + @JsonSubTypes.Type(value = CommitResponse.class, name = "commit"), + @JsonSubTypes.Type(value = RollbackResponse.class, name = "rollback") }) abstract class Response { abstract Response deserialize(Message genericMsg); abstract Message serialize(); @@ -161,12 +167,8 @@ public interface Service { } @Override CatalogsRequest deserialize(Message genericMsg) { - if (!(genericMsg instanceof Requests.CatalogsRequest)) { - throw new IllegalArgumentException( - "Expected CatalogsRequest, but got " + genericMsg.getClass().getName()); - } - - final Requests.CatalogsRequest msg = (Requests.CatalogsRequest) genericMsg; + final Requests.CatalogsRequest msg = ProtobufService.castProtobufMessage(genericMsg, + Requests.CatalogsRequest.class); final Descriptor desc = msg.getDescriptorForType(); String connectionId = null; @@ -234,12 +236,8 @@ public interface Service { } @Override DatabasePropertyRequest deserialize(Message genericMsg) { - if (!(genericMsg instanceof Requests.DatabasePropertyRequest)) { - throw new IllegalArgumentException( - "Expected DatabasePropertyRequest, but got " + genericMsg.getClass().getName()); - } - - final Requests.DatabasePropertyRequest msg = (Requests.DatabasePropertyRequest) genericMsg; + final Requests.DatabasePropertyRequest msg = ProtobufService.castProtobufMessage(genericMsg, + Requests.DatabasePropertyRequest.class); final Descriptor desc = msg.getDescriptorForType(); String connectionId = null; @@ -315,12 +313,8 @@ public interface Service { } @Override SchemasRequest deserialize(Message genericMsg) { - if (!(genericMsg instanceof Requests.SchemasRequest)) { - throw new IllegalArgumentException( - "Expected SchemasRequest, but got" + genericMsg.getClass().getName()); - } - - final Requests.SchemasRequest msg = (Requests.SchemasRequest) genericMsg; + final Requests.SchemasRequest msg = ProtobufService.castProtobufMessage(genericMsg, + Requests.SchemasRequest.class); final Descriptor desc = msg.getDescriptorForType(); String connectionId = null; @@ -442,12 +436,8 @@ public interface Service { } @Override Request deserialize(Message genericMsg) { - if (!(genericMsg instanceof Requests.TablesRequest)) { - throw new IllegalArgumentException( - "Expected TablesRequest, but got " + genericMsg.getClass().getName()); - } - - final Requests.TablesRequest msg = (Requests.TablesRequest) genericMsg; + final Requests.TablesRequest msg = ProtobufService.castProtobufMessage(genericMsg, + Requests.TablesRequest.class); final Descriptor desc = msg.getDescriptorForType(); String connectionId = null; @@ -591,12 +581,8 @@ public interface Service { } @Override TableTypesRequest deserialize(Message genericMsg) { - if (!(genericMsg instanceof Requests.TableTypesRequest)) { - throw new IllegalArgumentException( - "Expected TableTypesRequest, but got " + genericMsg.getClass().getName()); - } - - final Requests.TableTypesRequest msg = (Requests.TableTypesRequest) genericMsg; + final Requests.TableTypesRequest msg = ProtobufService.castProtobufMessage(genericMsg, + Requests.TableTypesRequest.class); final Descriptor desc = msg.getDescriptorForType(); String connectionId = null; @@ -679,12 +665,8 @@ public interface Service { } @Override ColumnsRequest deserialize(Message genericMsg) { - if (!(genericMsg instanceof Requests.ColumnsRequest)) { - throw new IllegalArgumentException( - "Expected ColumnsRequest, but got" + genericMsg.getClass().getName()); - } - - final Requests.ColumnsRequest msg = (Requests.ColumnsRequest) genericMsg; + final Requests.ColumnsRequest msg = ProtobufService.castProtobufMessage(genericMsg, + Requests.ColumnsRequest.class); final Descriptor desc = msg.getDescriptorForType(); String connectionId = null; @@ -825,12 +807,8 @@ public interface Service { } @Override TypeInfoRequest deserialize(Message genericMsg) { - if (!(genericMsg instanceof Requests.TypeInfoRequest)) { - throw new IllegalArgumentException( - "Expected TypeInfoRequest, but got " + genericMsg.getClass().getName()); - } - - final Requests.TypeInfoRequest msg = (Requests.TypeInfoRequest) genericMsg; + final Requests.TypeInfoRequest msg = ProtobufService.castProtobufMessage(genericMsg, + Requests.TypeInfoRequest.class); final Descriptor desc = msg.getDescriptorForType(); String connectionId = null; @@ -927,12 +905,10 @@ public interface Service { } @Override ResultSetResponse deserialize(Message genericMsg) { - if (!(genericMsg instanceof Responses.ResultSetResponse)) { - throw new IllegalArgumentException( - "Expected ResultSetResponse, but got " + genericMsg.getClass().getName()); - } + final Responses.ResultSetResponse msg = ProtobufService.castProtobufMessage(genericMsg, + Responses.ResultSetResponse.class); - return fromProto((Responses.ResultSetResponse) genericMsg); + return fromProto(msg); } static ResultSetResponse fromProto(Responses.ResultSetResponse msg) { @@ -1080,12 +1056,8 @@ public interface Service { } @Override PrepareAndExecuteRequest deserialize(Message genericMsg) { - if (!(genericMsg instanceof Requests.PrepareAndExecuteRequest)) { - throw new IllegalArgumentException( - "Expected PrepareAndExecuteRequest, but got " + genericMsg.getClass().getName()); - } - - final Requests.PrepareAndExecuteRequest msg = (Requests.PrepareAndExecuteRequest) genericMsg; + final Requests.PrepareAndExecuteRequest msg = ProtobufService.castProtobufMessage(genericMsg, + Requests.PrepareAndExecuteRequest.class); final Descriptor desc = msg.getDescriptorForType(); String connectionId = null; @@ -1187,12 +1159,8 @@ public interface Service { } @Override ExecuteRequest deserialize(Message genericMsg) { - if (!(genericMsg instanceof Requests.ExecuteRequest)) { - throw new IllegalArgumentException( - "Expected ExecuteRequest, but got " + genericMsg.getClass().getName()); - } - - final Requests.ExecuteRequest msg = (Requests.ExecuteRequest) genericMsg; + final Requests.ExecuteRequest msg = ProtobufService.castProtobufMessage(genericMsg, + Requests.ExecuteRequest.class); final Descriptor desc = msg.getDescriptorForType(); Meta.StatementHandle statemetnHandle = null; @@ -1297,12 +1265,8 @@ public interface Service { } @Override ExecuteResponse deserialize(Message genericMsg) { - if (!(genericMsg instanceof Responses.ExecuteResponse)) { - throw new IllegalArgumentException( - "Expected ExecuteResponse, but got " + genericMsg.getClass().getName()); - } - - Responses.ExecuteResponse msg = (Responses.ExecuteResponse) genericMsg; + final Responses.ExecuteResponse msg = ProtobufService.castProtobufMessage(genericMsg, + Responses.ExecuteResponse.class); final Descriptor desc = msg.getDescriptorForType(); List<Responses.ResultSetResponse> msgResults = msg.getResultsList(); @@ -1402,12 +1366,8 @@ public interface Service { } @Override PrepareRequest deserialize(Message genericMsg) { - if (!(genericMsg instanceof Requests.PrepareRequest)) { - throw new IllegalArgumentException( - "Expected PrepareRequest, but got " + genericMsg.getClass().getName()); - } - - final Requests.PrepareRequest msg = (Requests.PrepareRequest) genericMsg; + final Requests.PrepareRequest msg = ProtobufService.castProtobufMessage(genericMsg, + Requests.PrepareRequest.class); final Descriptor desc = msg.getDescriptorForType(); String connectionId = null; @@ -1496,13 +1456,9 @@ public interface Service { } @Override PrepareResponse deserialize(Message genericMsg) { - if (!(genericMsg instanceof Responses.PrepareResponse)) { - throw new IllegalArgumentException( - "Expected PrepareResponse, but got " + genericMsg.getClass().getName()); - } - - Responses.PrepareResponse msg = (Responses.PrepareResponse) genericMsg; - Descriptor desc = msg.getDescriptorForType(); + final Responses.PrepareResponse msg = ProtobufService.castProtobufMessage(genericMsg, + Responses.PrepareResponse.class); + final Descriptor desc = msg.getDescriptorForType(); RpcMetadataResponse metadata = null; if (ProtobufService.hasField(msg, desc, Responses.PrepareResponse.METADATA_FIELD_NUMBER)) { @@ -1598,12 +1554,8 @@ public interface Service { } @Override FetchRequest deserialize(Message genericMsg) { - if (!(genericMsg instanceof Requests.FetchRequest)) { - throw new IllegalArgumentException( - "Expected FetchRequest, but got " + genericMsg.getClass().getName()); - } - - final Requests.FetchRequest msg = (Requests.FetchRequest) genericMsg; + final Requests.FetchRequest msg = ProtobufService.castProtobufMessage(genericMsg, + Requests.FetchRequest.class); final Descriptor desc = msg.getDescriptorForType(); String connectionId = null; @@ -1688,13 +1640,9 @@ public interface Service { } @Override FetchResponse deserialize(Message genericMsg) { - if (!(genericMsg instanceof Responses.FetchResponse)) { - throw new IllegalArgumentException( - "Expected FetchResponse, but got" + genericMsg.getClass().getName()); - } - - Responses.FetchResponse msg = (Responses.FetchResponse) genericMsg; - Descriptor desc = msg.getDescriptorForType(); + final Responses.FetchResponse msg = ProtobufService.castProtobufMessage(genericMsg, + Responses.FetchResponse.class); + final Descriptor desc = msg.getDescriptorForType(); RpcMetadataResponse metadata = null; if (ProtobufService.hasField(msg, desc, Responses.FetchResponse.METADATA_FIELD_NUMBER)) { @@ -1778,12 +1726,8 @@ public interface Service { } @Override CreateStatementRequest deserialize(Message genericMsg) { - if (!(genericMsg instanceof Requests.CreateStatementRequest)) { - throw new IllegalArgumentException( - "Expected CreateStatementRequest, but got" + genericMsg.getClass().getName()); - } - - final Requests.CreateStatementRequest msg = (Requests.CreateStatementRequest) genericMsg; + final Requests.CreateStatementRequest msg = ProtobufService.castProtobufMessage(genericMsg, + Requests.CreateStatementRequest.class); final Descriptor desc = msg.getDescriptorForType(); String connectionId = null; @@ -1859,12 +1803,8 @@ public interface Service { } @Override CreateStatementResponse deserialize(Message genericMsg) { - if (!(genericMsg instanceof Responses.CreateStatementResponse)) { - throw new IllegalArgumentException( - "Expected CreateStatementResponse, but got " + genericMsg.getClass().getName()); - } - - final Responses.CreateStatementResponse msg = (Responses.CreateStatementResponse) genericMsg; + final Responses.CreateStatementResponse msg = ProtobufService.castProtobufMessage(genericMsg, + Responses.CreateStatementResponse.class); final Descriptor desc = msg.getDescriptorForType(); String connectionId = null; @@ -1962,12 +1902,8 @@ public interface Service { } @Override CloseStatementRequest deserialize(Message genericMsg) { - if (!(genericMsg instanceof Requests.CloseStatementRequest)) { - throw new IllegalArgumentException( - "Expected CloseStatementRequest, but got " + genericMsg.getClass().getName()); - } - - final Requests.CloseStatementRequest msg = (Requests.CloseStatementRequest) genericMsg; + final Requests.CloseStatementRequest msg = ProtobufService.castProtobufMessage(genericMsg, + Requests.CloseStatementRequest.class); final Descriptor desc = msg.getDescriptorForType(); String connectionId = null; @@ -2034,13 +1970,9 @@ public interface Service { } @Override CloseStatementResponse deserialize(Message genericMsg) { - if (!(genericMsg instanceof Responses.CloseStatementResponse)) { - throw new IllegalArgumentException( - "Expected CloseStatementResponse, but got " + genericMsg.getClass().getName()); - } - - Responses.CloseStatementResponse msg = (Responses.CloseStatementResponse) genericMsg; - Descriptor desc = msg.getDescriptorForType(); + final Responses.CloseStatementResponse msg = ProtobufService.castProtobufMessage(genericMsg, + Responses.CloseStatementResponse.class); + final Descriptor desc = msg.getDescriptorForType(); RpcMetadataResponse metadata = null; if (ProtobufService.hasField(msg, desc, @@ -2120,12 +2052,8 @@ public interface Service { } @Override Request deserialize(Message genericMsg) { - if (!(genericMsg instanceof Requests.OpenConnectionRequest)) { - throw new IllegalArgumentException( - "Expected OpenConnectionRequest, but got" + genericMsg.getClass().getName()); - } - - final Requests.OpenConnectionRequest msg = (Requests.OpenConnectionRequest) genericMsg; + final Requests.OpenConnectionRequest msg = ProtobufService.castProtobufMessage(genericMsg, + Requests.OpenConnectionRequest.class); final Descriptor desc = msg.getDescriptorForType(); String connectionId = null; @@ -2207,13 +2135,9 @@ public interface Service { } @Override OpenConnectionResponse deserialize(Message genericMsg) { - if (!(genericMsg instanceof Responses.OpenConnectionResponse)) { - throw new IllegalArgumentException( - "Expected OpenConnectionResponse, but got " + genericMsg.getClass().getName()); - } - - Responses.OpenConnectionResponse msg = (Responses.OpenConnectionResponse) genericMsg; - Descriptor desc = msg.getDescriptorForType(); + final Responses.OpenConnectionResponse msg = ProtobufService.castProtobufMessage(genericMsg, + Responses.OpenConnectionResponse.class); + final Descriptor desc = msg.getDescriptorForType(); RpcMetadataResponse metadata = null; if (ProtobufService.hasField(msg, desc, @@ -2265,12 +2189,8 @@ public interface Service { } @Override CloseConnectionRequest deserialize(Message genericMsg) { - if (!(genericMsg instanceof Requests.CloseConnectionRequest)) { - throw new IllegalArgumentException( - "Expected CloseConnectionRequest, but got " + genericMsg.getClass().getName()); - } - - final Requests.CloseConnectionRequest msg = (Requests.CloseConnectionRequest) genericMsg; + final Requests.CloseConnectionRequest msg = ProtobufService.castProtobufMessage(genericMsg, + Requests.CloseConnectionRequest.class); final Descriptor desc = msg.getDescriptorForType(); String connectionId = null; @@ -2321,13 +2241,9 @@ public interface Service { } @Override CloseConnectionResponse deserialize(Message genericMsg) { - if (!(genericMsg instanceof Responses.CloseConnectionResponse)) { - throw new IllegalArgumentException( - "Expected CloseConnectionResponse, but got " + genericMsg.getClass().getName()); - } - - Responses.CloseConnectionResponse msg = (Responses.CloseConnectionResponse) genericMsg; - Descriptor desc = msg.getDescriptorForType(); + final Responses.CloseConnectionResponse msg = ProtobufService.castProtobufMessage(genericMsg, + Responses.CloseConnectionResponse.class); + final Descriptor desc = msg.getDescriptorForType(); RpcMetadataResponse metadata = null; if (ProtobufService.hasField(msg, desc, @@ -2382,12 +2298,8 @@ public interface Service { } @Override ConnectionSyncRequest deserialize(Message genericMsg) { - if (!(genericMsg instanceof Requests.ConnectionSyncRequest)) { - throw new IllegalArgumentException( - "Expected ConnectionSyncRequest, but got " + genericMsg.getClass().getName()); - } - - final Requests.ConnectionSyncRequest msg = (Requests.ConnectionSyncRequest) genericMsg; + final Requests.ConnectionSyncRequest msg = ProtobufService.castProtobufMessage(genericMsg, + Requests.ConnectionSyncRequest.class); final Descriptor desc = msg.getDescriptorForType(); String connectionId = null; @@ -2476,13 +2388,9 @@ public interface Service { } @Override ConnectionSyncResponse deserialize(Message genericMsg) { - if (!(genericMsg instanceof Responses.ConnectionSyncResponse)) { - throw new IllegalArgumentException( - "Expected ConnectionSyncResponse, but got " + genericMsg.getClass().getName()); - } - - Responses.ConnectionSyncResponse msg = (Responses.ConnectionSyncResponse) genericMsg; - Descriptor desc = msg.getDescriptorForType(); + final Responses.ConnectionSyncResponse msg = ProtobufService.castProtobufMessage(genericMsg, + Responses.ConnectionSyncResponse.class); + final Descriptor desc = msg.getDescriptorForType(); RpcMetadataResponse metadata = null; if (ProtobufService.hasField(msg, desc, @@ -2566,13 +2474,9 @@ public interface Service { } @Override DatabasePropertyResponse deserialize(Message genericMsg) { - if (!(genericMsg instanceof Responses.DatabasePropertyResponse)) { - throw new IllegalArgumentException( - "Expected DatabasePropertyResponse, but got " + genericMsg.getClass().getName()); - } - - Responses.DatabasePropertyResponse msg = (Responses.DatabasePropertyResponse) genericMsg; - Descriptor desc = msg.getDescriptorForType(); + final Responses.DatabasePropertyResponse msg = ProtobufService.castProtobufMessage(genericMsg, + Responses.DatabasePropertyResponse.class); + final Descriptor desc = msg.getDescriptorForType(); HashMap<Meta.DatabaseProperty, Object> properties = new HashMap<>(); for (Responses.DatabasePropertyElement property : msg.getPropsList()) { @@ -2781,13 +2685,9 @@ public interface Service { } @Override ErrorResponse deserialize(Message genericMsg) { - if (!(genericMsg instanceof Responses.ErrorResponse)) { - throw new IllegalArgumentException("Expected ErrorResponse, but got " - + genericMsg.getClass()); - } - - Responses.ErrorResponse msg = (Responses.ErrorResponse) genericMsg; - Descriptor desc = msg.getDescriptorForType(); + final Responses.ErrorResponse msg = ProtobufService.castProtobufMessage(genericMsg, + Responses.ErrorResponse.class); + final Descriptor desc = msg.getDescriptorForType(); List<String> exceptions = null; if (msg.getHasExceptions()) { @@ -2957,15 +2857,34 @@ public interface Service { } Request deserialize(Message genericMsg) { - if (!(genericMsg instanceof Requests.SyncResultsRequest)) { - throw new IllegalArgumentException( - "Expected SyncResultsRequest, but got " + genericMsg.getClass().getName()); + final Requests.SyncResultsRequest msg = ProtobufService.castProtobufMessage(genericMsg, + Requests.SyncResultsRequest.class); + final Descriptor desc = msg.getDescriptorForType(); + + String connectionId = null; + if (ProtobufService.hasField(msg, desc, + Requests.SyncResultsRequest.CONNECTION_ID_FIELD_NUMBER)) { + connectionId = msg.getConnectionId(); + } + + int statementId = 0; + if (ProtobufService.hasField(msg, desc, + Requests.SyncResultsRequest.STATEMENT_ID_FIELD_NUMBER)) { + statementId = msg.getStatementId(); + } + + Common.QueryState state = null; + if (ProtobufService.hasField(msg, desc, Requests.SyncResultsRequest.STATE_FIELD_NUMBER)) { + state = msg.getState(); } - Requests.SyncResultsRequest msg = (Requests.SyncResultsRequest) genericMsg; + long offset = 0; + if (ProtobufService.hasField(msg, desc, Requests.SyncResultsRequest.OFFSET_FIELD_NUMBER)) { + offset = msg.getOffset(); + } - return new SyncResultsRequest(msg.getConnectionId(), msg.getStatementId(), - QueryState.fromProto(msg.getState()), msg.getOffset()); + return new SyncResultsRequest(connectionId, statementId, + null == state ? null : QueryState.fromProto(msg.getState()), offset); } Requests.SyncResultsRequest serialize() { @@ -3056,13 +2975,9 @@ public interface Service { } SyncResultsResponse deserialize(Message genericMsg) { - if (!(genericMsg instanceof Responses.SyncResultsResponse)) { - throw new IllegalArgumentException( - "Expected SyncResultsResponse, but got " + genericMsg.getClass().getName()); - } - - Responses.SyncResultsResponse msg = (Responses.SyncResultsResponse) genericMsg; - Descriptor desc = msg.getDescriptorForType(); + final Responses.SyncResultsResponse msg = ProtobufService.castProtobufMessage(genericMsg, + Responses.SyncResultsResponse.class); + final Descriptor desc = msg.getDescriptorForType(); RpcMetadataResponse metadata = null; if (ProtobufService.hasField(msg, desc, @@ -3132,12 +3047,10 @@ public interface Service { } @Override RpcMetadataResponse deserialize(Message genericMsg) { - if (!(genericMsg instanceof Responses.RpcMetadata)) { - throw new IllegalArgumentException("Expected RpcMetadata, but got " - + genericMsg.getClass().getName()); - } + final Responses.RpcMetadata msg = ProtobufService.castProtobufMessage(genericMsg, + Responses.RpcMetadata.class); - return fromProto((Responses.RpcMetadata) genericMsg); + return fromProto(msg); } @Override Responses.RpcMetadata serialize() { @@ -3145,7 +3058,7 @@ public interface Service { } static RpcMetadataResponse fromProto(Responses.RpcMetadata msg) { - Descriptor desc = msg.getDescriptorForType(); + final Descriptor desc = msg.getDescriptorForType(); String serverAddress = null; if (ProtobufService.hasField(msg, desc, Responses.RpcMetadata.SERVER_ADDRESS_FIELD_NUMBER)) { @@ -3167,6 +3080,186 @@ public interface Service { && Objects.equals(serverAddress, ((RpcMetadataResponse) obj).serverAddress)); } } + + /** + * An RPC request to invoke a commit on a Connection. + */ + class CommitRequest extends Request { + public final String connectionId; + + CommitRequest() { + this.connectionId = null; + } + + public CommitRequest(@JsonProperty("connectionId") String connectionId) { + this.connectionId = connectionId; + } + + @Override + CommitResponse accept(Service service) { + return service.apply(this); + } + + @Override + CommitRequest deserialize(Message genericMsg) { + final Requests.CommitRequest msg = ProtobufService.castProtobufMessage(genericMsg, + Requests.CommitRequest.class); + final Descriptor desc = msg.getDescriptorForType(); + + String connectionId = null; + if (ProtobufService.hasField(msg, desc, Requests.CommitRequest.CONNECTION_ID_FIELD_NUMBER)) { + connectionId = msg.getConnectionId(); + } + + return new CommitRequest(connectionId); + } + + @Override + Requests.CommitRequest serialize() { + Requests.CommitRequest.Builder builder = Requests.CommitRequest.newBuilder(); + + if (null != connectionId) { + builder.setConnectionId(connectionId); + } + + return builder.build(); + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((connectionId == null) ? 0 : connectionId.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + return this == obj || (obj instanceof CommitRequest + && Objects.equals(connectionId, ((CommitRequest) obj).connectionId)); + } + + } + + /** + * An RPC response from invoking commit on a Connection. + */ + class CommitResponse extends Response { + + CommitResponse() {} + + @Override + CommitResponse deserialize(Message genericMsg) { + // Checks the type of genericMsg + ProtobufService.castProtobufMessage(genericMsg, Responses.CommitResponse.class); + + return new CommitResponse(); + } + + @Override + Responses.CommitResponse serialize() { + return Responses.CommitResponse.newBuilder().build(); + } + + + @Override + public int hashCode() { + return 0; + } + + @Override + public boolean equals(Object obj) { + return this == obj || obj instanceof CommitResponse; + } + } + + /** + * An RPC request to invoke a rollback on a Connection. + */ + class RollbackRequest extends Request { + public final String connectionId; + + RollbackRequest() { + this.connectionId = null; + } + + public RollbackRequest(@JsonProperty("connectionId") String connectionId) { + this.connectionId = connectionId; + } + + @Override RollbackResponse accept(Service service) { + return service.apply(this); + } + + @Override RollbackRequest deserialize(Message genericMsg) { + final Requests.RollbackRequest msg = ProtobufService.castProtobufMessage(genericMsg, + Requests.RollbackRequest.class); + final Descriptor desc = msg.getDescriptorForType(); + + String connectionId = null; + if (ProtobufService.hasField(msg, desc, + Requests.RollbackRequest.CONNECTION_ID_FIELD_NUMBER)) { + connectionId = msg.getConnectionId(); + } + + return new RollbackRequest(connectionId); + } + + @Override Requests.RollbackRequest serialize() { + Requests.RollbackRequest.Builder builder = Requests.RollbackRequest.newBuilder(); + + if (null != connectionId) { + builder.setConnectionId(connectionId); + } + + return builder.build(); + } + + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((connectionId == null) ? 0 : connectionId.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + return this == obj || (obj instanceof RollbackRequest + && Objects.equals(connectionId, ((RollbackRequest) obj).connectionId)); + } + } + + /** + * An RPC response from invoking rollback on a Connection. + */ + class RollbackResponse extends Response { + + RollbackResponse() {} + + @Override RollbackResponse deserialize(Message genericMsg) { + // Check that genericMsg is the expected type + ProtobufService.castProtobufMessage(genericMsg, Responses.RollbackResponse.class); + return new RollbackResponse(); + } + + @Override Responses.RollbackResponse serialize() { + return Responses.RollbackResponse.newBuilder().build(); + } + + + @Override + public int hashCode() { + return 0; + } + + @Override + public boolean equals(Object obj) { + return this == obj || obj instanceof RollbackResponse; + } + } + } // End Service.java http://git-wip-us.apache.org/repos/asf/calcite/blob/322b9730/avatica/src/main/protobuf/requests.proto ---------------------------------------------------------------------- diff --git a/avatica/src/main/protobuf/requests.proto b/avatica/src/main/protobuf/requests.proto index 4201e3b..31b0941 100644 --- a/avatica/src/main/protobuf/requests.proto +++ b/avatica/src/main/protobuf/requests.proto @@ -133,4 +133,13 @@ message SyncResultsRequest { QueryState state = 3; uint64 offset = 4; } - + +// Request to invoke a commit on a Connection +message CommitRequest { + string connection_id = 1; +} + +// Request to invoke rollback on a Connection +message RollbackRequest { + string connection_id = 1; +} http://git-wip-us.apache.org/repos/asf/calcite/blob/322b9730/avatica/src/main/protobuf/responses.proto ---------------------------------------------------------------------- diff --git a/avatica/src/main/protobuf/responses.proto b/avatica/src/main/protobuf/responses.proto index 65c702e..01a62ed 100644 --- a/avatica/src/main/protobuf/responses.proto +++ b/avatica/src/main/protobuf/responses.proto @@ -113,4 +113,14 @@ message SyncResultsResponse { // Generic metadata for the server to return with each response. message RpcMetadata { string server_address = 1; // The host:port of the server -} \ No newline at end of file +} + +// Response to a commit request +message CommitResponse { + +} + +// Response to a rollback request +message RollbackResponse { + +} http://git-wip-us.apache.org/repos/asf/calcite/blob/322b9730/avatica/src/test/java/org/apache/calcite/avatica/remote/ProtobufServiceTest.java ---------------------------------------------------------------------- diff --git a/avatica/src/test/java/org/apache/calcite/avatica/remote/ProtobufServiceTest.java b/avatica/src/test/java/org/apache/calcite/avatica/remote/ProtobufServiceTest.java new file mode 100644 index 0000000..a29ee34 --- /dev/null +++ b/avatica/src/test/java/org/apache/calcite/avatica/remote/ProtobufServiceTest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.remote; + +import org.apache.calcite.avatica.proto.Requests; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +/** + * Test class for ProtobufService. + */ +public class ProtobufServiceTest { + + @Test public void testCastProtobufMessage() { + final Requests.CommitRequest commitReq = + Requests.CommitRequest.newBuilder().setConnectionId("cnxn1").build(); + final Requests.RollbackRequest rollbackReq = + Requests.RollbackRequest.newBuilder().setConnectionId("cnxn1").build(); + + assertEquals(commitReq, + ProtobufService.castProtobufMessage(commitReq, Requests.CommitRequest.class)); + assertEquals(rollbackReq, + ProtobufService.castProtobufMessage(rollbackReq, Requests.RollbackRequest.class)); + + try { + ProtobufService.castProtobufMessage(commitReq, Requests.RollbackRequest.class); + fail("Should have seen IllegalArgumentException casting CommitRequest into RollbackRequest"); + } catch (IllegalArgumentException e) { + // Expected + } + + try { + ProtobufService.castProtobufMessage(rollbackReq, Requests.CommitRequest.class); + fail("Should have seen IllegalArgumentException casting RollbackRequest into CommitRequest"); + } catch (IllegalArgumentException e) { + // Expected + } + } +} + +// End ProtobufServiceTest.java http://git-wip-us.apache.org/repos/asf/calcite/blob/322b9730/avatica/src/test/java/org/apache/calcite/avatica/remote/ProtobufTranslationImplTest.java ---------------------------------------------------------------------- diff --git a/avatica/src/test/java/org/apache/calcite/avatica/remote/ProtobufTranslationImplTest.java b/avatica/src/test/java/org/apache/calcite/avatica/remote/ProtobufTranslationImplTest.java index 83fd426..cfc92d9 100644 --- a/avatica/src/test/java/org/apache/calcite/avatica/remote/ProtobufTranslationImplTest.java +++ b/avatica/src/test/java/org/apache/calcite/avatica/remote/ProtobufTranslationImplTest.java @@ -33,6 +33,8 @@ import org.apache.calcite.avatica.remote.Service.CloseConnectionResponse; import org.apache.calcite.avatica.remote.Service.CloseStatementRequest; import org.apache.calcite.avatica.remote.Service.CloseStatementResponse; import org.apache.calcite.avatica.remote.Service.ColumnsRequest; +import org.apache.calcite.avatica.remote.Service.CommitRequest; +import org.apache.calcite.avatica.remote.Service.CommitResponse; import org.apache.calcite.avatica.remote.Service.ConnectionSyncRequest; import org.apache.calcite.avatica.remote.Service.ConnectionSyncResponse; import org.apache.calcite.avatica.remote.Service.CreateStatementRequest; @@ -51,6 +53,8 @@ import org.apache.calcite.avatica.remote.Service.PrepareResponse; import org.apache.calcite.avatica.remote.Service.Request; import org.apache.calcite.avatica.remote.Service.Response; import org.apache.calcite.avatica.remote.Service.ResultSetResponse; +import org.apache.calcite.avatica.remote.Service.RollbackRequest; +import org.apache.calcite.avatica.remote.Service.RollbackResponse; import org.apache.calcite.avatica.remote.Service.RpcMetadataResponse; import org.apache.calcite.avatica.remote.Service.SchemasRequest; import org.apache.calcite.avatica.remote.Service.SyncResultsRequest; @@ -206,6 +210,9 @@ public class ProtobufTranslationImplTest<T> { requests.add(new SyncResultsRequest("connectionId2", 54321, getMetadataQueryState1(), 0)); requests.add(new SyncResultsRequest("connectionId3", 5, getMetadataQueryState2(), 10)); + requests.add(new CommitRequest("connectionId")); + requests.add(new RollbackRequest("connectionId")); + return requests; } @@ -324,6 +331,9 @@ public class ProtobufTranslationImplTest<T> { responses.add( new ErrorResponse(Arrays.asList("stacktrace1", "stacktrace2"), null, 0, null, null, null)); + responses.add(new CommitResponse()); + responses.add(new RollbackResponse()); + return responses; } http://git-wip-us.apache.org/repos/asf/calcite/blob/322b9730/avatica/src/test/java/org/apache/calcite/avatica/test/JsonHandlerTest.java ---------------------------------------------------------------------- diff --git a/avatica/src/test/java/org/apache/calcite/avatica/test/JsonHandlerTest.java b/avatica/src/test/java/org/apache/calcite/avatica/test/JsonHandlerTest.java index 6fb9ce3..4445126 100644 --- a/avatica/src/test/java/org/apache/calcite/avatica/test/JsonHandlerTest.java +++ b/avatica/src/test/java/org/apache/calcite/avatica/test/JsonHandlerTest.java @@ -120,6 +120,14 @@ public class JsonHandlerTest { } @Override public void setRpcMetadata(RpcMetadataResponse metadata) {} + + @Override public CommitResponse apply(CommitRequest request) { + return null; + } + + @Override public RollbackResponse apply(RollbackRequest request) { + return null; + } } /** http://git-wip-us.apache.org/repos/asf/calcite/blob/322b9730/core/src/main/java/org/apache/calcite/jdbc/CalciteMetaImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/calcite/jdbc/CalciteMetaImpl.java b/core/src/main/java/org/apache/calcite/jdbc/CalciteMetaImpl.java index 96b0f54..01b07bc 100644 --- a/core/src/main/java/org/apache/calcite/jdbc/CalciteMetaImpl.java +++ b/core/src/main/java/org/apache/calcite/jdbc/CalciteMetaImpl.java @@ -658,6 +658,14 @@ public class CalciteMetaImpl extends MetaImpl { throw new UnsupportedOperationException(); } + @Override public void commit(ConnectionHandle ch) { + throw new UnsupportedOperationException(); + } + + @Override public void rollback(ConnectionHandle ch) { + throw new UnsupportedOperationException(); + } + /** Metadata describing a Calcite table. */ private static class CalciteMetaTable extends MetaTable { private final Table calciteTable; http://git-wip-us.apache.org/repos/asf/calcite/blob/322b9730/site/_docs/avatica_json_reference.md ---------------------------------------------------------------------- diff --git a/site/_docs/avatica_json_reference.md b/site/_docs/avatica_json_reference.md index ce624ae..c2244a6 100644 --- a/site/_docs/avatica_json_reference.md +++ b/site/_docs/avatica_json_reference.md @@ -8,12 +8,14 @@ requests: - { name: "CloseConnectionRequest" } - { name: "CloseStatementRequest" } - { name: "ColumnsRequest" } + - { name: "CommitRequest" } - { name: "ConnectionSyncRequest" } - { name: "CreateStatementRequest" } - { name: "DatabasePropertyRequest" } - { name: "FetchRequest" } - { name: "PrepareAndExecuteRequest" } - { name: "PrepareRequest" } + - { name: "RollbackRequest" } - { name: "SchemasRequest" } - { name: "TableTypesRequest" } - { name: "TablesRequest" } @@ -41,6 +43,8 @@ responses: - { name: "CloseConnectionResponse" } - { name: "ConnectionSyncResponse" } - { name: "DatabasePropertyResponse" } + - { name: "CommitResponse" } + - { name: "RollbackResponse" } --- <!-- @@ -151,6 +155,17 @@ There are no extra attributes on this Request. `columnNamePattern` (optional string) A Java Pattern against column names to limit returned columns. +### CommitRequest + +{% highlight json %} +{ + "request": "commit", + "connectionId": "000000-0000-0000-00000000" +} +{% endhighlight %} + +`connectionId`: (required string) The identifier of the connection on which to invoke commit. + ### ConnectionSyncRequest {% highlight json %} @@ -246,6 +261,17 @@ There are no extra attributes on this Request. `maxRowCount` (required long) The maximum number of rows returned in the response. +### RollbackRequest + +{% highlight json %} +{ + "request": "rollback", + "connectionId": "000000-0000-0000-00000000" +} +{% endhighlight %} + +`connectionId` (required string) The identifier for the connection on which to invoke rollback. + ### SchemasRequest {% highlight json %} @@ -423,6 +449,26 @@ There are no extra attributes on this Response. `map` A map of <a href="#databaseproperty">DatabaseProperty</a> to value of that property. The value may be some primitive type or an array of primitive types. +### CommitResponse + +{% highlight json %} +{ + "response": "commit" +} +{% endhighlight %} + +There are no extra attributes on this Response. + +### RollbackResponse + +{% highlight json %} +{ + "response": "rollback" +} +{% endhighlight %} + +There are no extra attributes on this Response. + ## Miscellaneous ### ConnectionProperties
