http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadRequest.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadRequest.java index 0000000,794a02b..c842a06 mode 000000,100644..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadRequest.java @@@ -1,0 -1,548 +1,534 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.processors.dataload; + + import org.apache.ignite.configuration.*; + import org.apache.ignite.internal.*; + import org.apache.ignite.lang.*; + import org.apache.ignite.internal.util.direct.*; + import org.apache.ignite.internal.util.typedef.internal.*; + import org.apache.ignite.internal.util.tostring.*; + import org.jetbrains.annotations.*; + + import java.nio.*; + import java.util.*; + + /** + * + */ + public class GridDataLoadRequest extends GridTcpCommunicationMessageAdapter { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private long reqId; + + /** */ + private byte[] resTopicBytes; + + /** Cache name. */ + private String cacheName; + + /** */ + private byte[] updaterBytes; + + /** Entries to put. */ + private byte[] colBytes; + + /** {@code True} to ignore deployment ownership. */ + private boolean ignoreDepOwnership; + + /** */ + private boolean skipStore; + + /** */ + private IgniteDeploymentMode depMode; + + /** */ + private String sampleClsName; + + /** */ + private String userVer; + + /** Node class loader participants. */ + @GridToStringInclude + @GridDirectMap(keyType = UUID.class, valueType = IgniteUuid.class) + private Map<UUID, IgniteUuid> ldrParticipants; + + /** */ + private IgniteUuid clsLdrId; + + /** */ + private boolean forceLocDep; + + /** + * {@code Externalizable} support. + */ + public GridDataLoadRequest() { + // No-op. + } + + /** + * @param reqId Request ID. + * @param resTopicBytes Response topic. + * @param cacheName Cache name. + * @param updaterBytes Cache updater. + * @param colBytes Collection bytes. + * @param ignoreDepOwnership Ignore ownership. + * @param skipStore Skip store flag. + * @param depMode Deployment mode. + * @param sampleClsName Sample class name. + * @param userVer User version. + * @param ldrParticipants Loader participants. + * @param clsLdrId Class loader ID. + * @param forceLocDep Force local deployment. + */ + public GridDataLoadRequest(long reqId, + byte[] resTopicBytes, + @Nullable String cacheName, + byte[] updaterBytes, + byte[] colBytes, + boolean ignoreDepOwnership, + boolean skipStore, + IgniteDeploymentMode depMode, + String sampleClsName, + String userVer, + Map<UUID, IgniteUuid> ldrParticipants, + IgniteUuid clsLdrId, + boolean forceLocDep) { + this.reqId = reqId; + this.resTopicBytes = resTopicBytes; + this.cacheName = cacheName; + this.updaterBytes = updaterBytes; + this.colBytes = colBytes; + this.ignoreDepOwnership = ignoreDepOwnership; + this.skipStore = skipStore; + this.depMode = depMode; + this.sampleClsName = sampleClsName; + this.userVer = userVer; + this.ldrParticipants = ldrParticipants; + this.clsLdrId = clsLdrId; + this.forceLocDep = forceLocDep; + } + + /** + * @return Request ID. + */ + public long requestId() { + return reqId; + } + + /** + * @return Response topic. + */ + public byte[] responseTopicBytes() { + return resTopicBytes; + } + + /** + * @return Cache name. + */ + public String cacheName() { + return cacheName; + } + + /** + * @return Updater. + */ + public byte[] updaterBytes() { + return updaterBytes; + } + + /** + * @return Collection bytes. + */ + public byte[] collectionBytes() { + return colBytes; + } + + /** + * @return {@code True} to ignore ownership. + */ + public boolean ignoreDeploymentOwnership() { + return ignoreDepOwnership; + } + + /** + * @return Skip store flag. + */ + public boolean skipStore() { + return skipStore; + } + + /** + * @return Deployment mode. + */ + public IgniteDeploymentMode deploymentMode() { + return depMode; + } + + /** + * @return Sample class name. + */ + public String sampleClassName() { + return sampleClsName; + } + + /** + * @return User version. + */ + public String userVersion() { + return userVer; + } + + /** + * @return Participants. + */ + public Map<UUID, IgniteUuid> participants() { + return ldrParticipants; + } + + /** + * @return Class loader ID. + */ + public IgniteUuid classLoaderId() { + return clsLdrId; + } + + /** + * @return {@code True} to force local deployment. + */ + public boolean forceLocalDeployment() { + return forceLocDep; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridDataLoadRequest.class, this); + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!commState.typeWritten) { - if (!commState.putByte(directType())) ++ if (!commState.putByte(null, directType())) + return false; + + commState.typeWritten = true; + } + + switch (commState.idx) { + case 0: - if (!commState.putString(cacheName)) ++ if (!commState.putString("cacheName", cacheName)) + return false; + + commState.idx++; + + case 1: - if (!commState.putGridUuid(clsLdrId)) ++ if (!commState.putGridUuid("clsLdrId", clsLdrId)) + return false; + + commState.idx++; + + case 2: - if (!commState.putByteArray(colBytes)) ++ if (!commState.putByteArray("colBytes", colBytes)) + return false; + + commState.idx++; + + case 3: - if (!commState.putEnum(depMode)) ++ if (!commState.putEnum("depMode", depMode)) + return false; + + commState.idx++; + + case 4: - if (!commState.putBoolean(forceLocDep)) ++ if (!commState.putBoolean("forceLocDep", forceLocDep)) + return false; + + commState.idx++; + + case 5: - if (!commState.putBoolean(ignoreDepOwnership)) ++ if (!commState.putBoolean("ignoreDepOwnership", ignoreDepOwnership)) + return false; + + commState.idx++; + + case 6: + if (ldrParticipants != null) { + if (commState.it == null) { - if (!commState.putInt(ldrParticipants.size())) ++ if (!commState.putInt(null, ldrParticipants.size())) + return false; + + commState.it = ldrParticipants.entrySet().iterator(); + } + + while (commState.it.hasNext() || commState.cur != NULL) { + if (commState.cur == NULL) + commState.cur = commState.it.next(); + + Map.Entry<UUID, IgniteUuid> e = (Map.Entry<UUID, IgniteUuid>)commState.cur; + + if (!commState.keyDone) { - if (!commState.putUuid(e.getKey())) ++ if (!commState.putUuid(null, e.getKey())) + return false; + + commState.keyDone = true; + } + - if (!commState.putGridUuid(e.getValue())) ++ if (!commState.putGridUuid(null, e.getValue())) + return false; + + commState.keyDone = false; + + commState.cur = NULL; + } + + commState.it = null; + } else { - if (!commState.putInt(-1)) ++ if (!commState.putInt(null, -1)) + return false; + } + + commState.idx++; + + case 7: - if (!commState.putLong(reqId)) ++ if (!commState.putLong("reqId", reqId)) + return false; + + commState.idx++; + + case 8: - if (!commState.putByteArray(resTopicBytes)) ++ if (!commState.putByteArray("resTopicBytes", resTopicBytes)) + return false; + + commState.idx++; + + case 9: - if (!commState.putString(sampleClsName)) ++ if (!commState.putString("sampleClsName", sampleClsName)) + return false; + + commState.idx++; + + case 10: - if (!commState.putBoolean(skipStore)) ++ if (!commState.putBoolean(null, skipStore)) + return false; + + commState.idx++; + + case 11: - if (!commState.putByteArray(updaterBytes)) ++ if (!commState.putByteArray(null, updaterBytes)) + return false; + + commState.idx++; + + case 12: - if (!commState.putString(userVer)) ++ if (!commState.putString(null, userVer)) + return false; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf) { + commState.setBuffer(buf); + + switch (commState.idx) { + case 0: - String cacheName0 = commState.getString(); ++ cacheName = commState.getString("cacheName"); + - if (cacheName0 == STR_NOT_READ) ++ if (!commState.lastRead()) + return false; + - cacheName = cacheName0; - + commState.idx++; + + case 1: - IgniteUuid clsLdrId0 = commState.getGridUuid(); ++ clsLdrId = commState.getGridUuid("clsLdrId"); + - if (clsLdrId0 == GRID_UUID_NOT_READ) ++ if (!commState.lastRead()) + return false; + - clsLdrId = clsLdrId0; - + commState.idx++; + + case 2: - byte[] colBytes0 = commState.getByteArray(); ++ colBytes = commState.getByteArray("colBytes"); + - if (colBytes0 == BYTE_ARR_NOT_READ) ++ if (!commState.lastRead()) + return false; + - colBytes = colBytes0; - + commState.idx++; + + case 3: - if (buf.remaining() < 1) - return false; ++ byte depMode0 = commState.getByte("depMode"); + - byte depMode0 = commState.getByte(); ++ if (!commState.lastRead()) ++ return false; + + depMode = IgniteDeploymentMode.fromOrdinal(depMode0); + + commState.idx++; + + case 4: - if (buf.remaining() < 1) - return false; ++ forceLocDep = commState.getBoolean("forceLocDep"); + - forceLocDep = commState.getBoolean(); ++ if (!commState.lastRead()) ++ return false; + + commState.idx++; + + case 5: - if (buf.remaining() < 1) - return false; ++ ignoreDepOwnership = commState.getBoolean("ignoreDepOwnership"); + - ignoreDepOwnership = commState.getBoolean(); ++ if (!commState.lastRead()) ++ return false; + + commState.idx++; + + case 6: + if (commState.readSize == -1) { - if (buf.remaining() < 4) - return false; ++ commState.readSize = commState.getInt(null); + - commState.readSize = commState.getInt(); ++ if (!commState.lastRead()) ++ return false; + } + + if (commState.readSize >= 0) { + if (ldrParticipants == null) + ldrParticipants = new HashMap<>(commState.readSize, 1.0f); + + for (int i = commState.readItems; i < commState.readSize; i++) { + if (!commState.keyDone) { - UUID _val = commState.getUuid(); ++ UUID _val = commState.getUuid(null); + - if (_val == UUID_NOT_READ) ++ if (!commState.lastRead()) + return false; + + commState.cur = _val; + commState.keyDone = true; + } + - IgniteUuid _val = commState.getGridUuid(); ++ IgniteUuid _val = commState.getGridUuid(null); + - if (_val == GRID_UUID_NOT_READ) ++ if (!commState.lastRead()) + return false; + + ldrParticipants.put((UUID)commState.cur, _val); + + commState.keyDone = false; + + commState.readItems++; + } + } + + commState.readSize = -1; + commState.readItems = 0; + commState.cur = null; + + commState.idx++; + + case 7: - if (buf.remaining() < 8) - return false; ++ reqId = commState.getLong("reqId"); + - reqId = commState.getLong(); ++ if (!commState.lastRead()) ++ return false; + + commState.idx++; + + case 8: - byte[] resTopicBytes0 = commState.getByteArray(); ++ resTopicBytes = commState.getByteArray("resTopicBytes"); + - if (resTopicBytes0 == BYTE_ARR_NOT_READ) ++ if (!commState.lastRead()) + return false; + - resTopicBytes = resTopicBytes0; - + commState.idx++; + + case 9: - String sampleClsName0 = commState.getString(); ++ sampleClsName = commState.getString("sampleClsName"); + - if (sampleClsName0 == STR_NOT_READ) ++ if (!commState.lastRead()) + return false; + - sampleClsName = sampleClsName0; - + commState.idx++; + + case 10: + if (buf.remaining() < 1) + return false; + - skipStore = commState.getBoolean(); ++ skipStore = commState.getBoolean(null); + + commState.idx++; + + case 11: - byte[] updaterBytes0 = commState.getByteArray(); ++ byte[] updaterBytes0 = commState.getByteArray(null); + - if (updaterBytes0 == BYTE_ARR_NOT_READ) ++ if (!commState.lastRead()) + return false; + - updaterBytes = updaterBytes0; - + commState.idx++; + + case 12: - String userVer0 = commState.getString(); ++ String userVer0 = commState.getString(null); + - if (userVer0 == STR_NOT_READ) ++ if (!commState.lastRead()) + return false; + - userVer = userVer0; - + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 61; + } + + /** {@inheritDoc} */ + @Override public GridTcpCommunicationMessageAdapter clone() { + GridDataLoadRequest _clone = new GridDataLoadRequest(); + + clone0(_clone); + + return _clone; + } + + /** {@inheritDoc} */ + @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { + GridDataLoadRequest _clone = (GridDataLoadRequest)_msg; + + _clone.reqId = reqId; + _clone.resTopicBytes = resTopicBytes; + _clone.cacheName = cacheName; + _clone.updaterBytes = updaterBytes; + _clone.colBytes = colBytes; + _clone.ignoreDepOwnership = ignoreDepOwnership; + _clone.skipStore = skipStore; + _clone.depMode = depMode; + _clone.sampleClsName = sampleClsName; + _clone.userVer = userVer; + _clone.ldrParticipants = ldrParticipants; + _clone.clsLdrId = clsLdrId; + _clone.forceLocDep = forceLocDep; + } + }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadResponse.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadResponse.java index 0000000,6f784f5..ed193a9 mode 000000,100644..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadResponse.java @@@ -1,0 -1,181 +1,179 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.processors.dataload; + + import org.apache.ignite.internal.util.direct.*; + import org.apache.ignite.internal.util.typedef.internal.*; + + import java.nio.*; + + /** + * + */ + public class GridDataLoadResponse extends GridTcpCommunicationMessageAdapter { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private long reqId; + + /** */ + private byte[] errBytes; + + /** */ + private boolean forceLocDep; + + /** + * @param reqId Request ID. + * @param errBytes Error bytes. + * @param forceLocDep Force local deployment. + */ + public GridDataLoadResponse(long reqId, byte[] errBytes, boolean forceLocDep) { + this.reqId = reqId; + this.errBytes = errBytes; + this.forceLocDep = forceLocDep; + } + + /** + * {@code Externalizable} support. + */ + public GridDataLoadResponse() { + // No-op. + } + + /** + * @return Request ID. + */ + public long requestId() { + return reqId; + } + + /** + * @return Error bytes. + */ + public byte[] errorBytes() { + return errBytes; + } + + /** + * @return {@code True} to force local deployment. + */ + public boolean forceLocalDeployment() { + return forceLocDep; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridDataLoadResponse.class, this); + } + + /** {@inheritDoc} */ + @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) + @Override public GridTcpCommunicationMessageAdapter clone() { + GridDataLoadResponse _clone = new GridDataLoadResponse(); + + clone0(_clone); + + return _clone; + } + + /** {@inheritDoc} */ + @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { + GridDataLoadResponse _clone = (GridDataLoadResponse)_msg; + + _clone.reqId = reqId; + _clone.errBytes = errBytes; + _clone.forceLocDep = forceLocDep; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean writeTo(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!commState.typeWritten) { - if (!commState.putByte(directType())) ++ if (!commState.putByte(null, directType())) + return false; + + commState.typeWritten = true; + } + + switch (commState.idx) { + case 0: - if (!commState.putByteArray(errBytes)) ++ if (!commState.putByteArray("errBytes", errBytes)) + return false; + + commState.idx++; + + case 1: - if (!commState.putBoolean(forceLocDep)) ++ if (!commState.putBoolean("forceLocDep", forceLocDep)) + return false; + + commState.idx++; + + case 2: - if (!commState.putLong(reqId)) ++ if (!commState.putLong("reqId", reqId)) + return false; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean readFrom(ByteBuffer buf) { + commState.setBuffer(buf); + + switch (commState.idx) { + case 0: - byte[] errBytes0 = commState.getByteArray(); ++ errBytes = commState.getByteArray("errBytes"); + - if (errBytes0 == BYTE_ARR_NOT_READ) ++ if (!commState.lastRead()) + return false; + - errBytes = errBytes0; - + commState.idx++; + + case 1: - if (buf.remaining() < 1) - return false; ++ forceLocDep = commState.getBoolean("forceLocDep"); + - forceLocDep = commState.getBoolean(); ++ if (!commState.lastRead()) ++ return false; + + commState.idx++; + + case 2: - if (buf.remaining() < 8) - return false; ++ reqId = commState.getLong("reqId"); + - reqId = commState.getLong(); ++ if (!commState.lastRead()) ++ return false; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 62; + } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsAckMessage.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsAckMessage.java index 0000000,eabc2b1..e39bc6b mode 000000,100644..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsAckMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsAckMessage.java @@@ -1,0 -1,212 +1,208 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.processors.fs; + + import org.apache.ignite.*; + import org.apache.ignite.internal.*; + import org.apache.ignite.lang.*; + import org.apache.ignite.marshaller.*; + import org.apache.ignite.internal.util.direct.*; + import org.jetbrains.annotations.*; + + import java.io.*; + import java.nio.*; + + /** + * Block write request acknowledgement message. + */ + public class GridGgfsAckMessage extends GridGgfsCommunicationMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** File id. */ + private IgniteUuid fileId; + + /** Request ID to ack. */ + private long id; + + /** Write exception. */ + @GridDirectTransient + private IgniteCheckedException err; + + /** */ + private byte[] errBytes; + + /** + * Empty constructor required by {@link Externalizable}. + */ + public GridGgfsAckMessage() { + // No-op. + } + + /** + * @param fileId File ID. + * @param id Request ID. + * @param err Error. + */ + public GridGgfsAckMessage(IgniteUuid fileId, long id, @Nullable IgniteCheckedException err) { + this.fileId = fileId; + this.id = id; + this.err = err; + } + + /** + * @return File ID. + */ + public IgniteUuid fileId() { + return fileId; + } + + /** + * @return Batch ID. + */ + public long id() { + return id; + } + + /** + * @return Error occurred when writing this batch, if any. + */ + public IgniteCheckedException error() { + return err; + } + + /** {@inheritDoc} */ + @Override public void prepareMarshal(IgniteMarshaller marsh) throws IgniteCheckedException { + super.prepareMarshal(marsh); + + if (err != null) + errBytes = marsh.marshal(err); + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(IgniteMarshaller marsh, @Nullable ClassLoader ldr) throws IgniteCheckedException { + super.finishUnmarshal(marsh, ldr); + + if (errBytes != null) + err = marsh.unmarshal(errBytes, ldr); + } + + /** {@inheritDoc} */ + @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) + @Override public GridTcpCommunicationMessageAdapter clone() { + GridGgfsAckMessage _clone = new GridGgfsAckMessage(); + + clone0(_clone); + + return _clone; + } + + /** {@inheritDoc} */ + @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { + super.clone0(_msg); + + GridGgfsAckMessage _clone = (GridGgfsAckMessage)_msg; + + _clone.fileId = fileId; + _clone.id = id; + _clone.err = err; + _clone.errBytes = errBytes; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean writeTo(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.writeTo(buf)) + return false; + + if (!commState.typeWritten) { - if (!commState.putByte(directType())) ++ if (!commState.putByte(null, directType())) + return false; + + commState.typeWritten = true; + } + + switch (commState.idx) { + case 0: - if (!commState.putByteArray(errBytes)) ++ if (!commState.putByteArray("errBytes", errBytes)) + return false; + + commState.idx++; + + case 1: - if (!commState.putGridUuid(fileId)) ++ if (!commState.putGridUuid("fileId", fileId)) + return false; + + commState.idx++; + + case 2: - if (!commState.putLong(id)) ++ if (!commState.putLong("id", id)) + return false; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean readFrom(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.readFrom(buf)) + return false; + + switch (commState.idx) { + case 0: - byte[] errBytes0 = commState.getByteArray(); ++ errBytes = commState.getByteArray("errBytes"); + - if (errBytes0 == BYTE_ARR_NOT_READ) ++ if (!commState.lastRead()) + return false; + - errBytes = errBytes0; - + commState.idx++; + + case 1: - IgniteUuid fileId0 = commState.getGridUuid(); ++ fileId = commState.getGridUuid("fileId"); + - if (fileId0 == GRID_UUID_NOT_READ) ++ if (!commState.lastRead()) + return false; + - fileId = fileId0; - + commState.idx++; + + case 2: - if (buf.remaining() < 8) - return false; ++ id = commState.getLong("id"); + - id = commState.getLong(); ++ if (!commState.lastRead()) ++ return false; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 65; + } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsBlockKey.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsBlockKey.java index 0000000,84fcaa6..101db19 mode 000000,100644..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsBlockKey.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsBlockKey.java @@@ -1,0 -1,279 +1,275 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.processors.fs; + + import org.apache.ignite.lang.*; + import org.apache.ignite.internal.processors.task.*; + import org.apache.ignite.internal.util.direct.*; + import org.apache.ignite.internal.util.typedef.*; + import org.apache.ignite.internal.util.typedef.internal.*; + import org.jetbrains.annotations.*; + + import java.io.*; + import java.nio.*; + + /** + * File's binary data block key. + */ + @GridInternal + public final class GridGgfsBlockKey extends GridTcpCommunicationMessageAdapter + implements Externalizable, Comparable<GridGgfsBlockKey> { + /** */ + private static final long serialVersionUID = 0L; + + /** File system file ID. */ + private IgniteUuid fileId; + + /** Block ID. */ + private long blockId; + + /** Block affinity key. */ + private IgniteUuid affKey; + + /** Eviction exclude flag. */ + private boolean evictExclude; + + /** + * Constructs file's binary data block key. + * + * @param fileId File ID. + * @param affKey Affinity key. + * @param evictExclude Evict exclude flag. + * @param blockId Block ID. + */ + public GridGgfsBlockKey(IgniteUuid fileId, @Nullable IgniteUuid affKey, boolean evictExclude, long blockId) { + assert fileId != null; + assert blockId >= 0; + + this.fileId = fileId; + this.affKey = affKey; + this.evictExclude = evictExclude; + this.blockId = blockId; + } + + /** + * Empty constructor required for {@link Externalizable}. + */ + public GridGgfsBlockKey() { + // No-op. + } + + /** + * @return File ID. + */ + public IgniteUuid getFileId() { + return fileId; + } + + /** + * @return Block affinity key. + */ + public IgniteUuid affinityKey() { + return affKey; + } + + /** + * @return Evict exclude flag. + */ + public boolean evictExclude() { + return evictExclude; + } + + /** + * @return Block ID. + */ + public long getBlockId() { + return blockId; + } + + /** {@inheritDoc} */ + @Override public int compareTo(@NotNull GridGgfsBlockKey o) { + int res = fileId.compareTo(o.fileId); + + if (res != 0) + return res; + + long v1 = blockId; + long v2 = o.blockId; + + if (v1 != v2) + return v1 > v2 ? 1 : -1; + + if (affKey == null && o.affKey == null) + return 0; + + if (affKey != null && o.affKey != null) + return affKey.compareTo(o.affKey); + + return affKey != null ? -1 : 1; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeGridUuid(out, fileId); + U.writeGridUuid(out, affKey); + out.writeBoolean(evictExclude); + out.writeLong(blockId); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException { + fileId = U.readGridUuid(in); + affKey = U.readGridUuid(in); + evictExclude = in.readBoolean(); + blockId = in.readLong(); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return fileId.hashCode() + (int)(blockId ^ (blockId >>> 32)); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (o == this) + return true; + + if (o == null || o.getClass() != getClass()) + return false; + + GridGgfsBlockKey that = (GridGgfsBlockKey)o; + + return blockId == that.blockId && fileId.equals(that.fileId) && F.eq(affKey, that.affKey) && + evictExclude == that.evictExclude; + } + + /** {@inheritDoc} */ + @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) + @Override public GridTcpCommunicationMessageAdapter clone() { + GridGgfsBlockKey _clone = new GridGgfsBlockKey(); + + clone0(_clone); + + return _clone; + } + + /** {@inheritDoc} */ + @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { + GridGgfsBlockKey _clone = (GridGgfsBlockKey)_msg; + + _clone.fileId = fileId; + _clone.blockId = blockId; + _clone.affKey = affKey; + _clone.evictExclude = evictExclude; + } + + /** {@inheritDoc} */ + @SuppressWarnings("fallthrough") + @Override public boolean writeTo(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!commState.typeWritten) { - if (!commState.putByte(directType())) ++ if (!commState.putByte(null, directType())) + return false; + + commState.typeWritten = true; + } + + switch (commState.idx) { + case 0: - if (!commState.putGridUuid(affKey)) ++ if (!commState.putGridUuid("affKey", affKey)) + return false; + + commState.idx++; + + case 1: - if (!commState.putLong(blockId)) ++ if (!commState.putLong("blockId", blockId)) + return false; + + commState.idx++; + + case 2: - if (!commState.putBoolean(evictExclude)) ++ if (!commState.putBoolean("evictExclude", evictExclude)) + return false; + + commState.idx++; + + case 3: - if (!commState.putGridUuid(fileId)) ++ if (!commState.putGridUuid("fileId", fileId)) + return false; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @SuppressWarnings("fallthrough") + @Override public boolean readFrom(ByteBuffer buf) { + commState.setBuffer(buf); + + switch (commState.idx) { + case 0: - IgniteUuid affKey0 = commState.getGridUuid(); ++ affKey = commState.getGridUuid("affKey"); + - if (affKey0 == GRID_UUID_NOT_READ) ++ if (!commState.lastRead()) + return false; + - affKey = affKey0; - + commState.idx++; + + case 1: - if (buf.remaining() < 8) - return false; ++ blockId = commState.getLong("blockId"); + - blockId = commState.getLong(); ++ if (!commState.lastRead()) ++ return false; + + commState.idx++; + + case 2: - if (buf.remaining() < 1) - return false; ++ evictExclude = commState.getBoolean("evictExclude"); + - evictExclude = commState.getBoolean(); ++ if (!commState.lastRead()) ++ return false; + + commState.idx++; + + case 3: - IgniteUuid fileId0 = commState.getGridUuid(); ++ fileId = commState.getGridUuid("fileId"); + - if (fileId0 == GRID_UUID_NOT_READ) ++ if (!commState.lastRead()) + return false; + - fileId = fileId0; - + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 66; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridGgfsBlockKey.class, this); + } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsBlocksMessage.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsBlocksMessage.java index 0000000,2d90e86..2a1f8a1 mode 000000,100644..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsBlocksMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsBlocksMessage.java @@@ -1,0 -1,257 +1,254 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.processors.fs; + + import org.apache.ignite.internal.*; + import org.apache.ignite.lang.*; + import org.apache.ignite.internal.util.direct.*; -import org.apache.ignite.internal.util.typedef.internal.*; + + import java.io.*; + import java.nio.*; + import java.util.*; + + /** + * GGFS write blocks message. + */ + public class GridGgfsBlocksMessage extends GridGgfsCommunicationMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** File id. */ + private IgniteUuid fileId; + + /** Batch id */ + private long id; + + /** Blocks to store. */ + @GridDirectMap(keyType = GridGgfsBlockKey.class, valueType = byte[].class) + private Map<GridGgfsBlockKey, byte[]> blocks; + + /** + * Empty constructor required by {@link Externalizable} + */ + public GridGgfsBlocksMessage() { + // No-op. + } + + /** + * Constructor. + * + * @param fileId File ID. + * @param id Message id. + * @param blocks Blocks to put in cache. + */ + public GridGgfsBlocksMessage(IgniteUuid fileId, long id, Map<GridGgfsBlockKey, byte[]> blocks) { + this.fileId = fileId; + this.id = id; + this.blocks = blocks; + } + + /** + * @return File id. + */ + public IgniteUuid fileId() { + return fileId; + } + + /** + * @return Batch id. + */ + public long id() { + return id; + } + + /** + * @return Map of blocks to put in cache. + */ + public Map<GridGgfsBlockKey, byte[]> blocks() { + return blocks; + } + + /** {@inheritDoc} */ + @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) + @Override public GridTcpCommunicationMessageAdapter clone() { + GridGgfsBlocksMessage _clone = new GridGgfsBlocksMessage(); + + clone0(_clone); + + return _clone; + } + + /** {@inheritDoc} */ + @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { + super.clone0(_msg); + + GridGgfsBlocksMessage _clone = (GridGgfsBlocksMessage)_msg; + + _clone.fileId = fileId; + _clone.id = id; + _clone.blocks = blocks; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean writeTo(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.writeTo(buf)) + return false; + + if (!commState.typeWritten) { - if (!commState.putByte(directType())) ++ if (!commState.putByte(null, directType())) + return false; + + commState.typeWritten = true; + } + + switch (commState.idx) { + case 0: + if (blocks != null) { + if (commState.it == null) { - if (!commState.putInt(blocks.size())) ++ if (!commState.putInt(null, blocks.size())) + return false; + + commState.it = blocks.entrySet().iterator(); + } + + while (commState.it.hasNext() || commState.cur != NULL) { + if (commState.cur == NULL) + commState.cur = commState.it.next(); + + Map.Entry<GridGgfsBlockKey, byte[]> e = (Map.Entry<GridGgfsBlockKey, byte[]>)commState.cur; + + if (!commState.keyDone) { - if (!commState.putMessage(e.getKey())) ++ if (!commState.putMessage(null, e.getKey())) + return false; + + commState.keyDone = true; + } + - if (!commState.putByteArray(e.getValue())) ++ if (!commState.putByteArray(null, e.getValue())) + return false; + + commState.keyDone = false; + + commState.cur = NULL; + } + + commState.it = null; + } else { - if (!commState.putInt(-1)) ++ if (!commState.putInt(null, -1)) + return false; + } + + commState.idx++; + + case 1: - if (!commState.putGridUuid(fileId)) ++ if (!commState.putGridUuid("fileId", fileId)) + return false; + + commState.idx++; + + case 2: - if (!commState.putLong(id)) ++ if (!commState.putLong("id", id)) + return false; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean readFrom(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.readFrom(buf)) + return false; + + switch (commState.idx) { + case 0: + if (commState.readSize == -1) { - if (buf.remaining() < 4) - return false; ++ commState.readSize = commState.getInt(null); + - commState.readSize = commState.getInt(); ++ if (!commState.lastRead()) ++ return false; + } + + if (commState.readSize >= 0) { + if (blocks == null) - blocks = U.newHashMap(commState.readSize); ++ blocks = new HashMap<>(commState.readSize, 1.0f); + + for (int i = commState.readItems; i < commState.readSize; i++) { + if (!commState.keyDone) { - Object _val = commState.getMessage(); ++ GridGgfsBlockKey _val = (GridGgfsBlockKey)commState.getMessage(null); + - if (_val == MSG_NOT_READ) ++ if (!commState.lastRead()) + return false; + + commState.cur = _val; + commState.keyDone = true; + } + - byte[] _val = commState.getByteArray(); ++ byte[] _val = commState.getByteArray(null); + - if (_val == BYTE_ARR_NOT_READ) ++ if (!commState.lastRead()) + return false; + + blocks.put((GridGgfsBlockKey)commState.cur, _val); + + commState.keyDone = false; + + commState.readItems++; + } + } + + commState.readSize = -1; + commState.readItems = 0; + commState.cur = null; + + commState.idx++; + + case 1: - IgniteUuid fileId0 = commState.getGridUuid(); ++ fileId = commState.getGridUuid("fileId"); + - if (fileId0 == GRID_UUID_NOT_READ) ++ if (!commState.lastRead()) + return false; + - fileId = fileId0; - + commState.idx++; + + case 2: - if (buf.remaining() < 8) - return false; ++ id = commState.getLong("id"); + - id = commState.getLong(); ++ if (!commState.lastRead()) ++ return false; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 67; + } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsCommunicationMessage.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsCommunicationMessage.java index 0000000,b4bedd8..dc0b4a9 mode 000000,100644..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsCommunicationMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsCommunicationMessage.java @@@ -1,0 -1,75 +1,75 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.processors.fs; + + import org.apache.ignite.*; + import org.apache.ignite.marshaller.*; + import org.apache.ignite.internal.util.direct.*; + import org.jetbrains.annotations.*; + + import java.nio.*; + + /** + * Base class for all GGFS communication messages sent between nodes. + */ + public abstract class GridGgfsCommunicationMessage extends GridTcpCommunicationMessageAdapter { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { + } + + /** + * @param marsh Marshaller. + * @throws IgniteCheckedException In case of error. + */ + public void prepareMarshal(IgniteMarshaller marsh) throws IgniteCheckedException { + // No-op. + } + + /** + * @param marsh Marshaller. + * @param ldr Class loader. + * @throws IgniteCheckedException In case of error. + */ + public void finishUnmarshal(IgniteMarshaller marsh, @Nullable ClassLoader ldr) throws IgniteCheckedException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!commState.typeWritten) { - if (!commState.putByte(directType())) ++ if (!commState.putByte(null, directType())) + return false; + + commState.typeWritten = true; + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf) { + commState.setBuffer(buf); + + return true; + } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsDeleteMessage.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsDeleteMessage.java index 0000000,2b565c6..cc64415 mode 000000,100644..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsDeleteMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsDeleteMessage.java @@@ -1,0 -1,206 +1,202 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.processors.fs; + + import org.apache.ignite.*; + import org.apache.ignite.internal.*; + import org.apache.ignite.lang.*; + import org.apache.ignite.marshaller.*; + import org.apache.ignite.internal.util.direct.*; + import org.apache.ignite.internal.util.typedef.internal.*; + import org.jetbrains.annotations.*; + + import java.io.*; + import java.nio.*; + + /** + * Indicates that entry scheduled for delete was actually deleted. + */ + public class GridGgfsDeleteMessage extends GridGgfsCommunicationMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** Deleted entry ID. */ + private IgniteUuid id; + + /** Optional error. */ + @GridDirectTransient + private IgniteCheckedException err; + + /** */ + private byte[] errBytes; + + /** + * {@link Externalizable} support. + */ + public GridGgfsDeleteMessage() { + // No-op. + } + + /** + * Constructor. + * + * @param id Deleted entry ID. + */ + public GridGgfsDeleteMessage(IgniteUuid id) { + assert id != null; + + this.id = id; + } + + /** + * Constructor. + * + * @param id Entry ID. + * @param err Error. + */ + public GridGgfsDeleteMessage(IgniteUuid id, IgniteCheckedException err) { + assert err != null; + + this.id = id; + this.err = err; + } + + /** + * @return Deleted entry ID. + */ + public IgniteUuid id() { + return id; + } + + /** + * @return Error. + */ + public IgniteCheckedException error() { + return err; + } + + /** {@inheritDoc} */ + @Override public void prepareMarshal(IgniteMarshaller marsh) throws IgniteCheckedException { + super.prepareMarshal(marsh); + + if (err != null) + errBytes = marsh.marshal(err); + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(IgniteMarshaller marsh, @Nullable ClassLoader ldr) throws IgniteCheckedException { + super.finishUnmarshal(marsh, ldr); + + if (errBytes != null) + err = marsh.unmarshal(errBytes, ldr); + } + + /** {@inheritDoc} */ + @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) + @Override public GridTcpCommunicationMessageAdapter clone() { + GridGgfsDeleteMessage _clone = new GridGgfsDeleteMessage(); + + clone0(_clone); + + return _clone; + } + + /** {@inheritDoc} */ + @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { + super.clone0(_msg); + + GridGgfsDeleteMessage _clone = (GridGgfsDeleteMessage)_msg; + + _clone.id = id; + _clone.err = err; + _clone.errBytes = errBytes; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean writeTo(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.writeTo(buf)) + return false; + + if (!commState.typeWritten) { - if (!commState.putByte(directType())) ++ if (!commState.putByte(null, directType())) + return false; + + commState.typeWritten = true; + } + + switch (commState.idx) { + case 0: - if (!commState.putByteArray(errBytes)) ++ if (!commState.putByteArray("errBytes", errBytes)) + return false; + + commState.idx++; + + case 1: - if (!commState.putGridUuid(id)) ++ if (!commState.putGridUuid("id", id)) + return false; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean readFrom(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.readFrom(buf)) + return false; + + switch (commState.idx) { + case 0: - byte[] errBytes0 = commState.getByteArray(); ++ errBytes = commState.getByteArray("errBytes"); + - if (errBytes0 == BYTE_ARR_NOT_READ) ++ if (!commState.lastRead()) + return false; + - errBytes = errBytes0; - + commState.idx++; + + case 1: - IgniteUuid id0 = commState.getGridUuid(); ++ id = commState.getGridUuid("id"); + - if (id0 == GRID_UUID_NOT_READ) ++ if (!commState.lastRead()) + return false; + - id = id0; - + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 68; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridGgfsDeleteMessage.class, this); + } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsFileAffinityRange.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsFileAffinityRange.java index 0000000,3474ac0..6fb4699 mode 000000,100644..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsFileAffinityRange.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsFileAffinityRange.java @@@ -1,0 -1,396 +1,394 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.processors.fs; + + import org.apache.ignite.lang.*; + import org.apache.ignite.internal.util.direct.*; + import org.apache.ignite.internal.util.typedef.*; + import org.apache.ignite.internal.util.typedef.internal.*; + import org.jetbrains.annotations.*; + + import java.io.*; + import java.nio.*; + import java.util.*; + + /** + * Affinity range. + */ + public class GridGgfsFileAffinityRange extends GridTcpCommunicationMessageAdapter implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Initial range status, right after creation. */ + public static final int RANGE_STATUS_INITIAL = 0; + + /** Moving range state. Fragmentizer started blocks copy. */ + public static final int RANGE_STATUS_MOVING = 1; + + /** Fragmentizer finished block copy for this range. */ + public static final int RANGE_STATUS_MOVED = 2; + + /** Range affinity key. */ + private IgniteUuid affKey; + + /** {@code True} if currently being moved by fragmentizer. */ + @SuppressWarnings("RedundantFieldInitialization") + private int status = RANGE_STATUS_INITIAL; + + /** Range start offset (divisible by block size). */ + private long startOff; + + /** Range end offset (endOff + 1 divisible by block size). */ + private long endOff; + + /** Transient flag indicating no further writes should be made to this range. */ + private boolean done; + + /** + * Empty constructor required by {@link Externalizable}. + */ + public GridGgfsFileAffinityRange() { + // No-op. + } + + /** + * @param startOff Start offset. + * @param endOff End offset. + * @param affKey Affinity key. + */ + GridGgfsFileAffinityRange(long startOff, long endOff, IgniteUuid affKey) { + this.startOff = startOff; + this.endOff = endOff; + this.affKey = affKey; + } + + /** + * Creates new range with updated status. + * + * @param other Initial range. + * @param status Updated status. + */ + GridGgfsFileAffinityRange(GridGgfsFileAffinityRange other, int status) { + startOff = other.startOff; + endOff = other.endOff; + affKey = other.affKey; + + this.status = status; + } + + /** + * @return Affinity key for this range. + */ + public IgniteUuid affinityKey() { + return affKey; + } + + /** + * @return Range start offset. + */ + public long startOffset() { + return startOff; + } + + /** + * @return Range end offset. + */ + public long endOffset() { + return endOff; + } + + /** + * @param blockStartOff Block start offset to check. + * @return {@code True} if block with given start offset belongs to this range. + */ + public boolean belongs(long blockStartOff) { + return blockStartOff >= startOff && blockStartOff < endOff; + } + + /** + * @param blockStartOff Block start offset to check. + * @return {@code True} if block with given start offset is located before this range. + */ + public boolean less(long blockStartOff) { + return blockStartOff < startOff; + } + + /** + * @param blockStartOff Block start offset to check. + * @return {@code True} if block with given start offset is located after this range. + */ + public boolean greater(long blockStartOff) { + return blockStartOff > endOff; + } + + /** + * @return If range is empty, i.e. has zero length. + */ + public boolean empty() { + return startOff == endOff; + } + + /** + * @return Range status. + */ + public int status() { + return status; + } + + /** + * Expands this range by given block. + * + * @param blockStartOff Offset of block start. + * @param expansionSize Block size. + */ + public void expand(long blockStartOff, int expansionSize) { + // If we are expanding empty range. + if (endOff == startOff) { + assert endOff == blockStartOff : "Failed to expand range [endOff=" + endOff + + ", blockStartOff=" + blockStartOff + ", expansionSize=" + expansionSize + ']'; + + endOff += expansionSize - 1; + } + else { + assert endOff == blockStartOff - 1; + + endOff += expansionSize; + } + } + + /** + * Splits range into collection if smaller ranges with length equal to {@code maxSize}. + * + * @param maxSize Split part maximum size. + * @return Collection of range parts. + */ + public Collection<GridGgfsFileAffinityRange> split(long maxSize) { + long len = endOff - startOff + 1; + + if (len > maxSize) { + int size = (int)(len / maxSize + 1); + + Collection<GridGgfsFileAffinityRange> res = new ArrayList<>(size); + + long pos = startOff; + + while (pos < endOff + 1) { + long end = Math.min(pos + maxSize - 1, endOff); + + GridGgfsFileAffinityRange part = new GridGgfsFileAffinityRange(pos, end, affKey); + + part.status = status; + + res.add(part); + + pos = end + 1; + } + + return res; + } + else + return Collections.singletonList(this); + } + + /** + * Tries to concatenate this range with a given one. If ranges are not adjacent, will return {@code null}. + * + * @param range Range to concatenate with. + * @return Concatenation result or {@code null} if ranges are not adjacent. + */ + @Nullable public GridGgfsFileAffinityRange concat(GridGgfsFileAffinityRange range) { + if (endOff + 1 != range.startOff || !F.eq(affKey, range.affKey) || status != RANGE_STATUS_INITIAL) + return null; + + return new GridGgfsFileAffinityRange(startOff, range.endOff, affKey); + } + + /** + * Marks this range as done. + */ + public void markDone() { + done = true; + } + + /** + * @return Done flag. + */ + public boolean done() { + return done; + } + + /** + * Checks if range regions are equal. + * + * @param other Other range to check against. + * @return {@code True} if range regions are equal. + */ + public boolean regionEqual(GridGgfsFileAffinityRange other) { + return startOff == other.startOff && endOff == other.endOff; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeGridUuid(out, affKey); + + out.writeInt(status); + + out.writeLong(startOff); + out.writeLong(endOff); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + affKey = U.readGridUuid(in); + + status = in.readInt(); + + startOff = in.readLong(); + endOff = in.readLong(); + } + + /** {@inheritDoc} */ + @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) + @Override public GridTcpCommunicationMessageAdapter clone() { + GridGgfsFileAffinityRange _clone = new GridGgfsFileAffinityRange(); + + clone0(_clone); + + return _clone; + } + + /** {@inheritDoc} */ + @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { + GridGgfsFileAffinityRange _clone = (GridGgfsFileAffinityRange)_msg; + + _clone.affKey = affKey; + _clone.status = status; + _clone.startOff = startOff; + _clone.endOff = endOff; + _clone.done = done; + } + + /** {@inheritDoc} */ + @SuppressWarnings("fallthrough") + @Override public boolean writeTo(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!commState.typeWritten) { - if (!commState.putByte(directType())) ++ if (!commState.putByte(null, directType())) + return false; + + commState.typeWritten = true; + } + + switch (commState.idx) { + case 0: - if (!commState.putGridUuid(affKey)) ++ if (!commState.putGridUuid("affKey", affKey)) + return false; + + commState.idx++; + + case 1: - if (!commState.putBoolean(done)) ++ if (!commState.putBoolean("done", done)) + return false; + + commState.idx++; + + case 2: - if (!commState.putLong(endOff)) ++ if (!commState.putLong("endOff", endOff)) + return false; + + commState.idx++; + + case 3: - if (!commState.putLong(startOff)) ++ if (!commState.putLong("startOff", startOff)) + return false; + + commState.idx++; + + case 4: - if (!commState.putInt(status)) ++ if (!commState.putInt("status", status)) + return false; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @SuppressWarnings("fallthrough") + @Override public boolean readFrom(ByteBuffer buf) { + commState.setBuffer(buf); + + switch (commState.idx) { + case 0: - IgniteUuid affKey0 = commState.getGridUuid(); ++ affKey = commState.getGridUuid("affKey"); + - if (affKey0 == GRID_UUID_NOT_READ) ++ if (!commState.lastRead()) + return false; + - affKey = affKey0; - + commState.idx++; + + case 1: - if (buf.remaining() < 1) - return false; ++ done = commState.getBoolean("done"); + - done = commState.getBoolean(); ++ if (!commState.lastRead()) ++ return false; + + commState.idx++; + + case 2: - if (buf.remaining() < 8) - return false; ++ endOff = commState.getLong("endOff"); + - endOff = commState.getLong(); ++ if (!commState.lastRead()) ++ return false; + + commState.idx++; + + case 3: - if (buf.remaining() < 8) - return false; ++ startOff = commState.getLong("startOff"); + - startOff = commState.getLong(); ++ if (!commState.lastRead()) ++ return false; + + commState.idx++; + + case 4: - if (buf.remaining() < 4) - return false; ++ status = commState.getInt("status"); + - status = commState.getInt(); ++ if (!commState.lastRead()) ++ return false; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 69; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridGgfsFileAffinityRange.class, this); + } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsFragmentizerRequest.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsFragmentizerRequest.java index 0000000,3224340..37954a3 mode 000000,100644..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsFragmentizerRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsFragmentizerRequest.java @@@ -1,0 -1,212 +1,210 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.processors.fs; + + import org.apache.ignite.internal.*; + import org.apache.ignite.lang.*; + import org.apache.ignite.internal.util.direct.*; + import org.apache.ignite.internal.util.typedef.internal.*; + import org.apache.ignite.internal.util.tostring.*; + + import java.io.*; + import java.nio.*; + import java.util.*; + + /** + * Fragmentizer request. Sent from coordinator to other GGFS nodes when colocated part of file + * should be fragmented. + */ + public class GridGgfsFragmentizerRequest extends GridGgfsCommunicationMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** File id. */ + private IgniteUuid fileId; + + /** Ranges to fragment. */ + @GridToStringInclude + @GridDirectCollection(GridGgfsFileAffinityRange.class) + private Collection<GridGgfsFileAffinityRange> fragmentRanges; + + /** + * Empty constructor required by {@link Externalizable}. + */ + public GridGgfsFragmentizerRequest() { + // No-op. + } + + /** + * @param fileId File id to fragment. + * @param fragmentRanges Ranges to fragment. + */ + public GridGgfsFragmentizerRequest(IgniteUuid fileId, Collection<GridGgfsFileAffinityRange> fragmentRanges) { + this.fileId = fileId; + this.fragmentRanges = fragmentRanges; + } + + /** + * @return File ID. + */ + public IgniteUuid fileId() { + return fileId; + } + + /** + * @return Fragment ranges. + */ + public Collection<GridGgfsFileAffinityRange> fragmentRanges() { + return fragmentRanges; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridGgfsFragmentizerRequest.class, this); + } + + /** {@inheritDoc} */ + @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) + @Override public GridTcpCommunicationMessageAdapter clone() { + GridGgfsFragmentizerRequest _clone = new GridGgfsFragmentizerRequest(); + + clone0(_clone); + + return _clone; + } + + /** {@inheritDoc} */ + @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { + super.clone0(_msg); + + GridGgfsFragmentizerRequest _clone = (GridGgfsFragmentizerRequest)_msg; + + _clone.fileId = fileId; + _clone.fragmentRanges = fragmentRanges; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean writeTo(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.writeTo(buf)) + return false; + + if (!commState.typeWritten) { - if (!commState.putByte(directType())) ++ if (!commState.putByte(null, directType())) + return false; + + commState.typeWritten = true; + } + + switch (commState.idx) { + case 0: - if (!commState.putGridUuid(fileId)) ++ if (!commState.putGridUuid("fileId", fileId)) + return false; + + commState.idx++; + + case 1: + if (fragmentRanges != null) { + if (commState.it == null) { - if (!commState.putInt(fragmentRanges.size())) ++ if (!commState.putInt(null, fragmentRanges.size())) + return false; + + commState.it = fragmentRanges.iterator(); + } + + while (commState.it.hasNext() || commState.cur != NULL) { + if (commState.cur == NULL) + commState.cur = commState.it.next(); + - if (!commState.putMessage((GridGgfsFileAffinityRange)commState.cur)) ++ if (!commState.putMessage(null, (GridGgfsFileAffinityRange)commState.cur)) + return false; + + commState.cur = NULL; + } + + commState.it = null; + } else { - if (!commState.putInt(-1)) ++ if (!commState.putInt(null, -1)) + return false; + } + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean readFrom(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.readFrom(buf)) + return false; + + switch (commState.idx) { + case 0: - IgniteUuid fileId0 = commState.getGridUuid(); ++ fileId = commState.getGridUuid("fileId"); + - if (fileId0 == GRID_UUID_NOT_READ) ++ if (!commState.lastRead()) + return false; + - fileId = fileId0; - + commState.idx++; + + case 1: + if (commState.readSize == -1) { - if (buf.remaining() < 4) - return false; ++ commState.readSize = commState.getInt(null); + - commState.readSize = commState.getInt(); ++ if (!commState.lastRead()) ++ return false; + } + + if (commState.readSize >= 0) { + if (fragmentRanges == null) + fragmentRanges = new ArrayList<>(commState.readSize); + + for (int i = commState.readItems; i < commState.readSize; i++) { - Object _val = commState.getMessage(); ++ GridGgfsFileAffinityRange _val = (GridGgfsFileAffinityRange)commState.getMessage(null); + - if (_val == MSG_NOT_READ) ++ if (!commState.lastRead()) + return false; + + fragmentRanges.add((GridGgfsFileAffinityRange)_val); + + commState.readItems++; + } + } + + commState.readSize = -1; + commState.readItems = 0; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 70; + } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsFragmentizerResponse.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsFragmentizerResponse.java index 0000000,641cdcc..19a04d2 mode 000000,100644..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsFragmentizerResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsFragmentizerResponse.java @@@ -1,0 -1,131 +1,129 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.processors.fs; + + import org.apache.ignite.lang.*; + import org.apache.ignite.internal.util.direct.*; + + import java.io.*; + import java.nio.*; + + /** + * Fragmentizer response. + */ + public class GridGgfsFragmentizerResponse extends GridGgfsCommunicationMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** File ID. */ + private IgniteUuid fileId; + + /** + * Empty constructor required for {@link Externalizable}. + */ + public GridGgfsFragmentizerResponse() { + // No-op. + } + + /** + * @param fileId File ID. + */ + public GridGgfsFragmentizerResponse(IgniteUuid fileId) { + this.fileId = fileId; + } + + /** + * @return File ID. + */ + public IgniteUuid fileId() { + return fileId; + } + + /** {@inheritDoc} */ + @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) + @Override public GridTcpCommunicationMessageAdapter clone() { + GridGgfsFragmentizerResponse _clone = new GridGgfsFragmentizerResponse(); + + clone0(_clone); + + return _clone; + } + + /** {@inheritDoc} */ + @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { + super.clone0(_msg); + + GridGgfsFragmentizerResponse _clone = (GridGgfsFragmentizerResponse)_msg; + + _clone.fileId = fileId; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean writeTo(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.writeTo(buf)) + return false; + + if (!commState.typeWritten) { - if (!commState.putByte(directType())) ++ if (!commState.putByte(null, directType())) + return false; + + commState.typeWritten = true; + } + + switch (commState.idx) { + case 0: - if (!commState.putGridUuid(fileId)) ++ if (!commState.putGridUuid("fileId", fileId)) + return false; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean readFrom(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.readFrom(buf)) + return false; + + switch (commState.idx) { + case 0: - IgniteUuid fileId0 = commState.getGridUuid(); ++ fileId = commState.getGridUuid("fileId"); + - if (fileId0 == GRID_UUID_NOT_READ) ++ if (!commState.lastRead()) + return false; + - fileId = fileId0; - + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 71; + } + }