WIP on messages.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f3cfae0f Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f3cfae0f Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f3cfae0f Branch: refs/heads/ignite-4565-ddl Commit: f3cfae0f5666274c420b23256942e2134d611d83 Parents: cfe4aeb Author: devozerov <[email protected]> Authored: Tue Mar 21 16:08:36 2017 +0300 Committer: devozerov <[email protected]> Committed: Tue Mar 21 16:08:36 2017 +0300 ---------------------------------------------------------------------- .../communication/GridIoMessageFactory.java | 10 +- .../query/ddl/IndexOperationStatusMessage.java | 149 ------------------- .../query/ddl/IndexOperationStatusRequest.java | 122 +++++++++++++++ .../query/ddl/IndexOperationStatusResponse.java | 149 +++++++++++++++++++ 4 files changed, 279 insertions(+), 151 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/f3cfae0f/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index ddb486f..d2dfc1f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -137,7 +137,8 @@ import org.apache.ignite.internal.processors.igfs.IgfsFragmentizerResponse; import org.apache.ignite.internal.processors.igfs.IgfsSyncMessage; import org.apache.ignite.internal.processors.marshaller.MissingMappingRequestMessage; import org.apache.ignite.internal.processors.marshaller.MissingMappingResponseMessage; -import org.apache.ignite.internal.processors.query.ddl.IndexOperationStatusMessage; +import org.apache.ignite.internal.processors.query.ddl.IndexOperationStatusRequest; +import org.apache.ignite.internal.processors.query.ddl.IndexOperationStatusResponse; import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryCancelRequest; import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryFailResponse; import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageRequest; @@ -864,7 +865,12 @@ public class GridIoMessageFactory implements MessageFactory { break; case -49: - msg = new IndexOperationStatusMessage(); + msg = new IndexOperationStatusRequest(); + + break; + + case -50: + msg = new IndexOperationStatusResponse(); break; http://git-wip-us.apache.org/repos/asf/ignite/blob/f3cfae0f/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationStatusMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationStatusMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationStatusMessage.java deleted file mode 100644 index fb160ff..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationStatusMessage.java +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.query.ddl; - -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.plugin.extensions.communication.Message; -import org.apache.ignite.plugin.extensions.communication.MessageReader; -import org.apache.ignite.plugin.extensions.communication.MessageWriter; - -import java.nio.ByteBuffer; -import java.util.UUID; - -/** - * Message with index operation status. Sent from participant to coordinator when index creation is completed or - * when coordinator changes. - */ -public class IndexOperationStatusMessage implements Message { - /** */ - private static final long serialVersionUID = 0L; - - /** Operation ID. */ - private UUID opId; - - /** Error message. */ - private String errMsg; - - /** - * Default constructor. - */ - public IndexOperationStatusMessage() { - // No-op. - } - - /** - * Constructor. - * - * @param opId Operation ID. - * @param errMsg Error message. - */ - public IndexOperationStatusMessage(UUID opId, String errMsg) { - this.opId = opId; - this.errMsg = errMsg; - } - - /** - * @return Operation ID. - */ - public UUID operationId() { - return opId; - } - - /** - * @return Error message. - */ - public String errorMessage() { - return errMsg; - } - - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - writer.setBuffer(buf); - - if (!writer.isHeaderWritten()) { - if (!writer.writeHeader(directType(), fieldsCount())) - return false; - - writer.onHeaderWritten(); - } - - switch (writer.state()) { - case 0: - if (!writer.writeUuid("opId", opId)) - return false; - - writer.incrementState(); - - case 1: - if (!writer.writeString("errMsg", errMsg)) - return false; - - writer.incrementState(); - } - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - reader.setBuffer(buf); - - if (!reader.beforeMessageRead()) - return false; - - switch (reader.state()) { - case 0: - opId = reader.readUuid("opId"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 1: - errMsg = reader.readString("errMsg"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - } - - return reader.afterMessageRead(IndexOperationStatusMessage.class); - } - - /** {@inheritDoc} */ - @Override public byte directType() { - return -49; - } - - /** {@inheritDoc} */ - @Override public byte fieldsCount() { - return 2; - } - - /** {@inheritDoc} */ - @Override public void onAckReceived() { - // No-op. - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(IndexOperationStatusMessage.class, this); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/f3cfae0f/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationStatusRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationStatusRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationStatusRequest.java new file mode 100644 index 0000000..766eecf --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationStatusRequest.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.ddl; + +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +import java.nio.ByteBuffer; +import java.util.UUID; + +/** + * Operation status request. + */ +public class IndexOperationStatusRequest implements Message { + /** */ + private static final long serialVersionUID = 0L; + + /** Operation ID. */ + private UUID opId; + + /** + * Default constructor. + */ + public IndexOperationStatusRequest() { + // No-op. + } + + /** + * Constructor. + * + * @param opId Operation ID. + */ + public IndexOperationStatusRequest(UUID opId, String errMsg) { + this.opId = opId; + } + + /** + * @return Operation ID. + */ + public UUID operationId() { + return opId; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeUuid("opId", opId)) + return false; + + writer.incrementState(); + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + opId = reader.readUuid("opId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + } + + return reader.afterMessageRead(IndexOperationStatusRequest.class); + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return -49; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 1; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IndexOperationStatusRequest.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f3cfae0f/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationStatusResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationStatusResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationStatusResponse.java new file mode 100644 index 0000000..e9220c9 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationStatusResponse.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.ddl; + +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +import java.nio.ByteBuffer; +import java.util.UUID; + +/** + * Message with index operation status. Sent from participant to coordinator when index creation is completed or + * when coordinator changes. + */ +public class IndexOperationStatusResponse implements Message { + /** */ + private static final long serialVersionUID = 0L; + + /** Operation ID. */ + private UUID opId; + + /** Error message. */ + private String errMsg; + + /** + * Default constructor. + */ + public IndexOperationStatusResponse() { + // No-op. + } + + /** + * Constructor. + * + * @param opId Operation ID. + * @param errMsg Error message. + */ + public IndexOperationStatusResponse(UUID opId, String errMsg) { + this.opId = opId; + this.errMsg = errMsg; + } + + /** + * @return Operation ID. + */ + public UUID operationId() { + return opId; + } + + /** + * @return Error message. + */ + public String errorMessage() { + return errMsg; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeUuid("opId", opId)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeString("errMsg", errMsg)) + return false; + + writer.incrementState(); + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + opId = reader.readUuid("opId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + errMsg = reader.readString("errMsg"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + } + + return reader.afterMessageRead(IndexOperationStatusResponse.class); + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return -50; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 2; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IndexOperationStatusResponse.class, this); + } +}
