http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestProtocol.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestProtocol.java index 0000000,a4f6488..71ed911 mode 000000,100644..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestProtocol.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestProtocol.java @@@ -1,0 -1,335 +1,280 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.processors.rest.protocols.tcp; + + import org.apache.ignite.*; + import org.apache.ignite.configuration.*; + import org.apache.ignite.internal.*; + import org.apache.ignite.marshaller.*; + import org.apache.ignite.marshaller.jdk.*; + import org.apache.ignite.spi.*; + import org.apache.ignite.client.marshaller.*; + import org.apache.ignite.client.marshaller.jdk.*; + import org.apache.ignite.client.marshaller.optimized.*; + import org.apache.ignite.client.ssl.*; + import org.apache.ignite.internal.processors.rest.*; + import org.apache.ignite.internal.processors.rest.client.message.*; + import org.apache.ignite.internal.processors.rest.protocols.*; -import org.apache.ignite.internal.util.direct.*; + import org.apache.ignite.internal.util.nio.*; + import org.apache.ignite.internal.util.nio.ssl.*; + import org.apache.ignite.internal.util.typedef.internal.*; + import org.jetbrains.annotations.*; + + import javax.net.ssl.*; + import java.io.*; + import java.net.*; + import java.nio.*; + import java.util.*; + + import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.*; + + /** + * TCP binary protocol implementation. + */ + public class GridTcpRestProtocol extends GridRestProtocolAdapter { + /** Server. */ + private GridNioServer<GridClientMessage> srv; + + /** JDK marshaller. */ + private final IgniteMarshaller jdkMarshaller = new IgniteJdkMarshaller(); + + /** NIO server listener. */ + private GridTcpRestNioListener lsnr; + - /** Message reader. */ - private final GridNioMessageReader msgReader = new GridNioMessageReader() { - @Override public boolean read(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg, ByteBuffer buf) { - assert msg != null; - assert buf != null; - - msg.messageReader(this, nodeId); - - return msg.readFrom(buf); - } - - @Nullable @Override public GridTcpMessageFactory messageFactory() { - return null; - } - }; - - /** Message writer. */ - private final GridNioMessageWriter msgWriter = new GridNioMessageWriter() { - @Override public boolean write(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg, ByteBuffer buf) { - assert msg != null; - assert buf != null; - - msg.messageWriter(this, nodeId); - - return msg.writeTo(buf); - } - - @Override public int writeFully(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg, OutputStream out, - ByteBuffer buf) throws IOException { - assert msg != null; - assert out != null; - assert buf != null; - assert buf.hasArray(); - - msg.messageWriter(this, nodeId); - - boolean finished = false; - int cnt = 0; - - while (!finished) { - finished = msg.writeTo(buf); - - out.write(buf.array(), 0, buf.position()); - - cnt += buf.position(); - - buf.clear(); - } - - return cnt; - } - }; - + /** @param ctx Context. */ + public GridTcpRestProtocol(GridKernalContext ctx) { + super(ctx); + } + + /** + * @return JDK marshaller. + */ + IgniteMarshaller jdkMarshaller() { + return jdkMarshaller; + } + + /** + * Returns marshaller. + * + * @param ses Session. + * @return Marshaller. + */ + GridClientMarshaller marshaller(GridNioSession ses) { + GridClientMarshaller marsh = ses.meta(MARSHALLER.ordinal()); + + assert marsh != null; + + return marsh; + } + + /** + * @param ses Session. + * @return Whether portable marshaller is used. + */ + boolean portableMode(GridNioSession ses) { + return ctx.portable().isPortable(marshaller(ses)); + } + + /** {@inheritDoc} */ + @Override public String name() { + return "TCP binary"; + } + + /** {@inheritDoc} */ + @SuppressWarnings("BusyWait") + @Override public void start(final GridRestProtocolHandler hnd) throws IgniteCheckedException { + assert hnd != null; + + ClientConnectionConfiguration cfg = ctx.config().getClientConnectionConfiguration(); + + assert cfg != null; + + lsnr = new GridTcpRestNioListener(log, this, hnd, ctx); + - GridNioParser parser = new GridTcpRestDirectParser(this, msgReader); ++ GridNioParser parser = new GridTcpRestDirectParser(this); + + try { + host = resolveRestTcpHost(ctx.config()); + + SSLContext sslCtx = null; + + if (cfg.isRestTcpSslEnabled()) { + GridSslContextFactory factory = cfg.getRestTcpSslContextFactory(); + + if (factory == null) + // Thrown SSL exception instead of IgniteCheckedException for writing correct warning message into log. + throw new SSLException("SSL is enabled, but SSL context factory is not specified."); + + sslCtx = factory.createSslContext(); + } + + int lastPort = cfg.getRestTcpPort() + cfg.getRestPortRange() - 1; + + for (int port0 = cfg.getRestTcpPort(); port0 <= lastPort; port0++) { + if (startTcpServer(host, port0, lsnr, parser, sslCtx, cfg)) { + port = port0; + + if (log.isInfoEnabled()) + log.info(startInfo()); + + return; + } + } + + U.warn(log, "Failed to start TCP binary REST server (possibly all ports in range are in use) " + + "[firstPort=" + cfg.getRestTcpPort() + ", lastPort=" + lastPort + ", host=" + host + ']'); + } + catch (SSLException e) { + U.warn(log, "Failed to start " + name() + " protocol on port " + port + ": " + e.getMessage(), + "Failed to start " + name() + " protocol on port " + port + ". Check if SSL context factory is " + + "properly configured."); + } + catch (IOException e) { + U.warn(log, "Failed to start " + name() + " protocol on port " + port + ": " + e.getMessage(), + "Failed to start " + name() + " protocol on port " + port + ". " + + "Check restTcpHost configuration property."); + } + } + + /** {@inheritDoc} */ + @Override public void onKernalStart() { + super.onKernalStart(); + + Map<Byte, GridClientMarshaller> marshMap = new HashMap<>(); + + marshMap.put(GridClientOptimizedMarshaller.ID, new GridClientOptimizedMarshaller()); + marshMap.put(GridClientJdkMarshaller.ID, new GridClientJdkMarshaller()); + marshMap.put((byte)0, ctx.portable().portableMarshaller()); + + lsnr.marshallers(marshMap); + } + + /** {@inheritDoc} */ + @Override public void stop() { + if (srv != null) { + ctx.ports().deregisterPorts(getClass()); + + srv.stop(); + } + + if (log.isInfoEnabled()) + log.info(stopInfo()); + } + + /** + * Resolves host for REST TCP server using grid configuration. + * + * @param cfg Grid configuration. + * @return REST host. + * @throws IOException If failed to resolve REST host. + */ + private InetAddress resolveRestTcpHost(IgniteConfiguration cfg) throws IOException { + String host = cfg.getClientConnectionConfiguration().getRestTcpHost(); + + if (host == null) + host = cfg.getLocalHost(); + + return U.resolveLocalHost(host); + } + + /** + * Tries to start server with given parameters. + * + * @param hostAddr Host on which server should be bound. + * @param port Port on which server should be bound. + * @param lsnr Server message listener. + * @param parser Server message parser. + * @param sslCtx SSL context in case if SSL is enabled. + * @param cfg Configuration for other parameters. + * @return {@code True} if server successfully started, {@code false} if port is used and + * server was unable to start. + */ + private boolean startTcpServer(InetAddress hostAddr, int port, GridNioServerListener<GridClientMessage> lsnr, + GridNioParser parser, @Nullable SSLContext sslCtx, ClientConnectionConfiguration cfg) { + try { + GridNioFilter codec = new GridNioCodecFilter(parser, log, true); + + GridNioFilter[] filters; + + if (sslCtx != null) { + GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtx, log); + + sslFilter.directMode(true); + + boolean auth = cfg.isRestTcpSslClientAuth(); + + sslFilter.wantClientAuth(auth); + + sslFilter.needClientAuth(auth); + + filters = new GridNioFilter[] { + codec, + sslFilter + }; + } + else + filters = new GridNioFilter[] { codec }; + + srv = GridNioServer.<GridClientMessage>builder() + .address(hostAddr) + .port(port) + .listener(lsnr) + .logger(log) + .selectorCount(cfg.getRestTcpSelectorCount()) + .gridName(ctx.gridName()) + .tcpNoDelay(cfg.isRestTcpNoDelay()) + .directBuffer(cfg.isRestTcpDirectBuffer()) + .byteOrder(ByteOrder.nativeOrder()) + .socketSendBufferSize(cfg.getRestTcpSendBufferSize()) + .socketReceiveBufferSize(cfg.getRestTcpReceiveBufferSize()) + .sendQueueLimit(cfg.getRestTcpSendQueueLimit()) + .filters(filters) + .directMode(true) - .messageWriter(msgWriter) + .build(); + + srv.idleTimeout(cfg.getRestIdleTimeout()); + + srv.start(); + + ctx.ports().registerPort(port, IgnitePortProtocol.TCP, getClass()); + + return true; + } + catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to start " + name() + " protocol on port " + port + ": " + e.getMessage()); + + return false; + } + } + + /** {@inheritDoc} */ + @Override protected String getAddressPropertyName() { + return GridNodeAttributes.ATTR_REST_TCP_ADDRS; + } + + /** {@inheritDoc} */ + @Override protected String getHostNamePropertyName() { + return GridNodeAttributes.ATTR_REST_TCP_HOST_NAMES; + } + + /** {@inheritDoc} */ + @Override protected String getPortPropertyName() { + return GridNodeAttributes.ATTR_REST_TCP_PORT; + } + }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerCancelRequest.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerCancelRequest.java index 0000000,0afb745..efc80cb mode 000000,100644..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerCancelRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerCancelRequest.java @@@ -1,0 -1,123 +1,121 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.processors.streamer; + + import org.apache.ignite.lang.*; + import org.apache.ignite.internal.util.direct.*; + + import java.io.*; + import java.nio.*; + + /** + * Streamer cancel request. + */ + public class GridStreamerCancelRequest extends GridTcpCommunicationMessageAdapter { + /** */ + private static final long serialVersionUID = 0L; + + /** Cancelled future ID. */ + private IgniteUuid cancelledFutId; + + /** + * Empty constructor required by {@link Externalizable}. + */ + public GridStreamerCancelRequest() { + // No-op. + } + + /** + * @param cancelledFutId Cancelled future ID. + */ + public GridStreamerCancelRequest(IgniteUuid cancelledFutId) { + this.cancelledFutId = cancelledFutId; + } + + /** + * @return Cancelled future ID. + */ + public IgniteUuid cancelledFutureId() { + return cancelledFutId; + } + + /** {@inheritDoc} */ + @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) + @Override public GridTcpCommunicationMessageAdapter clone() { + GridStreamerCancelRequest _clone = new GridStreamerCancelRequest(); + + clone0(_clone); + + return _clone; + } + + /** {@inheritDoc} */ + @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { + GridStreamerCancelRequest _clone = (GridStreamerCancelRequest)_msg; + + _clone.cancelledFutId = cancelledFutId; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean writeTo(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!commState.typeWritten) { - if (!commState.putByte(directType())) ++ if (!commState.putByte(null, directType())) + return false; + + commState.typeWritten = true; + } + + switch (commState.idx) { + case 0: - if (!commState.putGridUuid(cancelledFutId)) ++ if (!commState.putGridUuid("cancelledFutId", cancelledFutId)) + return false; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean readFrom(ByteBuffer buf) { + commState.setBuffer(buf); + + switch (commState.idx) { + case 0: - IgniteUuid cancelledFutId0 = commState.getGridUuid(); ++ cancelledFutId = commState.getGridUuid("cancelledFutId"); + - if (cancelledFutId0 == GRID_UUID_NOT_READ) ++ if (!commState.lastRead()) + return false; + - cancelledFutId = cancelledFutId0; - + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 75; + } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerExecutionRequest.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerExecutionRequest.java index 0000000,5e9f937..e62a003 mode 000000,100644..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerExecutionRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerExecutionRequest.java @@@ -1,0 -1,382 +1,374 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.processors.streamer; + + import org.apache.ignite.configuration.*; + import org.apache.ignite.internal.*; + import org.apache.ignite.lang.*; + import org.apache.ignite.internal.util.direct.*; + import org.apache.ignite.internal.util.typedef.internal.*; + import org.apache.ignite.internal.util.tostring.*; + import org.jetbrains.annotations.*; + + import java.nio.*; + import java.util.*; + + /** + * + */ + public class GridStreamerExecutionRequest extends GridTcpCommunicationMessageAdapter { + /** */ + private static final long serialVersionUID = 0L; + + /** Force local deployment flag. */ + private boolean forceLocDep; + + /** Serialized batch in case if P2P class loading is enabled. */ + @GridToStringExclude + private byte[] batchBytes; + + /** Deployment mode. */ + private IgniteDeploymentMode depMode; + + /** Deployment sample class name. */ + private String sampleClsName; + + /** Deployment user version. */ + private String userVer; + + /** Node class loader participants. */ + @GridToStringInclude + @GridDirectMap(keyType = UUID.class, valueType = IgniteUuid.class) + private Map<UUID, IgniteUuid> ldrParticipants; + + /** Class loader ID. */ + private IgniteUuid clsLdrId; + + /** + * + */ + public GridStreamerExecutionRequest() { + // No-op. + } + + /** + * @param forceLocDep Force local deployment flag. + * @param batchBytes Batch serialized bytes. + * @param depMode Deployment mode. + * @param sampleClsName Sample class name. + * @param userVer User version. + * @param ldrParticipants Loader participants. + * @param clsLdrId Class loader ID. + */ + public GridStreamerExecutionRequest( + boolean forceLocDep, + byte[] batchBytes, + @Nullable IgniteDeploymentMode depMode, + @Nullable String sampleClsName, + @Nullable String userVer, + @Nullable Map<UUID, IgniteUuid> ldrParticipants, + @Nullable IgniteUuid clsLdrId + ) { + assert batchBytes != null; + + this.forceLocDep = forceLocDep; + this.batchBytes = batchBytes; + this.depMode = depMode; + this.sampleClsName = sampleClsName; + this.userVer = userVer; + this.ldrParticipants = ldrParticipants; + this.clsLdrId = clsLdrId; + } + + /** + * @return Force local deployment flag. + */ + public boolean forceLocalDeployment() { + return forceLocDep; + } + + /** + * @return Deployment mode. + */ + public IgniteDeploymentMode deploymentMode() { + return depMode; + } + + /** + * @return Deployment sample class name. + */ + public String sampleClassName() { + return sampleClsName; + } + + /** + * @return Deployment user version. + */ + public String userVersion() { + return userVer; + } + + /** + * @return Node class loader participants. + */ + public Map<UUID, IgniteUuid> loaderParticipants() { + return ldrParticipants; + } + + /** + * @return Class loader ID. + */ + public IgniteUuid classLoaderId() { + return clsLdrId; + } + + /** + * @return Serialized batch in case if P2P class loading is enabled. + */ + public byte[] batchBytes() { + return batchBytes; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridStreamerExecutionRequest.class, this); + } + + /** {@inheritDoc} */ + @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) + @Override public GridTcpCommunicationMessageAdapter clone() { + GridStreamerExecutionRequest _clone = new GridStreamerExecutionRequest(); + + clone0(_clone); + + return _clone; + } + + /** {@inheritDoc} */ + @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { + GridStreamerExecutionRequest _clone = (GridStreamerExecutionRequest)_msg; + + _clone.forceLocDep = forceLocDep; + _clone.batchBytes = batchBytes; + _clone.depMode = depMode; + _clone.sampleClsName = sampleClsName; + _clone.userVer = userVer; + _clone.ldrParticipants = ldrParticipants; + _clone.clsLdrId = clsLdrId; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean writeTo(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!commState.typeWritten) { - if (!commState.putByte(directType())) ++ if (!commState.putByte(null, directType())) + return false; + + commState.typeWritten = true; + } + + switch (commState.idx) { + case 0: - if (!commState.putByteArray(batchBytes)) ++ if (!commState.putByteArray("batchBytes", batchBytes)) + return false; + + commState.idx++; + + case 1: - if (!commState.putGridUuid(clsLdrId)) ++ if (!commState.putGridUuid("clsLdrId", clsLdrId)) + return false; + + commState.idx++; + + case 2: - if (!commState.putEnum(depMode)) ++ if (!commState.putEnum("depMode", depMode)) + return false; + + commState.idx++; + + case 3: - if (!commState.putBoolean(forceLocDep)) ++ if (!commState.putBoolean("forceLocDep", forceLocDep)) + return false; + + commState.idx++; + + case 4: + if (ldrParticipants != null) { + if (commState.it == null) { - if (!commState.putInt(ldrParticipants.size())) ++ if (!commState.putInt(null, ldrParticipants.size())) + return false; + + commState.it = ldrParticipants.entrySet().iterator(); + } + + while (commState.it.hasNext() || commState.cur != NULL) { + if (commState.cur == NULL) + commState.cur = commState.it.next(); + + Map.Entry<UUID, IgniteUuid> e = (Map.Entry<UUID, IgniteUuid>)commState.cur; + + if (!commState.keyDone) { - if (!commState.putUuid(e.getKey())) ++ if (!commState.putUuid(null, e.getKey())) + return false; + + commState.keyDone = true; + } + - if (!commState.putGridUuid(e.getValue())) ++ if (!commState.putGridUuid(null, e.getValue())) + return false; + + commState.keyDone = false; + + commState.cur = NULL; + } + + commState.it = null; + } else { - if (!commState.putInt(-1)) ++ if (!commState.putInt(null, -1)) + return false; + } + + commState.idx++; + + case 5: - if (!commState.putString(sampleClsName)) ++ if (!commState.putString("sampleClsName", sampleClsName)) + return false; + + commState.idx++; + + case 6: - if (!commState.putString(userVer)) ++ if (!commState.putString("userVer", userVer)) + return false; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean readFrom(ByteBuffer buf) { + commState.setBuffer(buf); + + switch (commState.idx) { + case 0: - byte[] batchBytes0 = commState.getByteArray(); ++ batchBytes = commState.getByteArray("batchBytes"); + - if (batchBytes0 == BYTE_ARR_NOT_READ) ++ if (!commState.lastRead()) + return false; + - batchBytes = batchBytes0; - + commState.idx++; + + case 1: - IgniteUuid clsLdrId0 = commState.getGridUuid(); ++ clsLdrId = commState.getGridUuid("clsLdrId"); + - if (clsLdrId0 == GRID_UUID_NOT_READ) ++ if (!commState.lastRead()) + return false; + - clsLdrId = clsLdrId0; - + commState.idx++; + + case 2: - if (buf.remaining() < 1) - return false; ++ byte depMode0 = commState.getByte("depMode"); + - byte depMode0 = commState.getByte(); ++ if (!commState.lastRead()) ++ return false; + + depMode = IgniteDeploymentMode.fromOrdinal(depMode0); + + commState.idx++; + + case 3: - if (buf.remaining() < 1) - return false; ++ forceLocDep = commState.getBoolean("forceLocDep"); + - forceLocDep = commState.getBoolean(); ++ if (!commState.lastRead()) ++ return false; + + commState.idx++; + + case 4: + if (commState.readSize == -1) { - if (buf.remaining() < 4) - return false; ++ commState.readSize = commState.getInt(null); + - commState.readSize = commState.getInt(); ++ if (!commState.lastRead()) ++ return false; + } + + if (commState.readSize >= 0) { + if (ldrParticipants == null) - ldrParticipants = U.newHashMap(commState.readSize); ++ ldrParticipants = new HashMap<>(commState.readSize, 1.0f); + + for (int i = commState.readItems; i < commState.readSize; i++) { + if (!commState.keyDone) { - UUID _val = commState.getUuid(); ++ UUID _val = commState.getUuid(null); + - if (_val == UUID_NOT_READ) ++ if (!commState.lastRead()) + return false; + + commState.cur = _val; + commState.keyDone = true; + } + - IgniteUuid _val = commState.getGridUuid(); ++ IgniteUuid _val = commState.getGridUuid(null); + - if (_val == GRID_UUID_NOT_READ) ++ if (!commState.lastRead()) + return false; + + ldrParticipants.put((UUID)commState.cur, _val); + + commState.keyDone = false; + + commState.readItems++; + } + } + + commState.readSize = -1; + commState.readItems = 0; + commState.cur = null; + + commState.idx++; + + case 5: - String sampleClsName0 = commState.getString(); ++ sampleClsName = commState.getString("sampleClsName"); + - if (sampleClsName0 == STR_NOT_READ) ++ if (!commState.lastRead()) + return false; + - sampleClsName = sampleClsName0; - + commState.idx++; + + case 6: - String userVer0 = commState.getString(); ++ userVer = commState.getString("userVer"); + - if (userVer0 == STR_NOT_READ) ++ if (!commState.lastRead()) + return false; + - userVer = userVer0; - + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 76; + } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerResponse.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerResponse.java index 0000000,7ca0e36..fdd21df mode 000000,100644..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerResponse.java @@@ -1,0 -1,160 +1,156 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.processors.streamer; + + import org.apache.ignite.lang.*; + import org.apache.ignite.internal.util.direct.*; + import org.apache.ignite.internal.util.typedef.internal.*; + import org.jetbrains.annotations.*; + + import java.nio.*; + + /** + * + */ + public class GridStreamerResponse extends GridTcpCommunicationMessageAdapter { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private IgniteUuid futId; + + /** */ + private byte[] errBytes; + + /** + * + */ + public GridStreamerResponse() { + // No-op. + } + + /** + * @param futId Future ID. + * @param errBytes Serialized error, if any. + */ + public GridStreamerResponse(IgniteUuid futId, @Nullable byte[] errBytes) { + assert futId != null; + + this.futId = futId; + this.errBytes = errBytes; + } + + /** + * @return Future ID. + */ + public IgniteUuid futureId() { + return futId; + } + + /** + * @return Serialized error. + */ + public byte[] errorBytes() { + return errBytes; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridStreamerResponse.class, this); + } + + /** {@inheritDoc} */ + @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) + @Override public GridTcpCommunicationMessageAdapter clone() { + GridStreamerResponse _clone = new GridStreamerResponse(); + + clone0(_clone); + + return _clone; + } + + /** {@inheritDoc} */ + @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { + GridStreamerResponse _clone = (GridStreamerResponse)_msg; + + _clone.futId = futId; + _clone.errBytes = errBytes; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean writeTo(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!commState.typeWritten) { - if (!commState.putByte(directType())) ++ if (!commState.putByte(null, directType())) + return false; + + commState.typeWritten = true; + } + + switch (commState.idx) { + case 0: - if (!commState.putByteArray(errBytes)) ++ if (!commState.putByteArray("errBytes", errBytes)) + return false; + + commState.idx++; + + case 1: - if (!commState.putGridUuid(futId)) ++ if (!commState.putGridUuid("futId", futId)) + return false; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean readFrom(ByteBuffer buf) { + commState.setBuffer(buf); + + switch (commState.idx) { + case 0: - byte[] errBytes0 = commState.getByteArray(); ++ errBytes = commState.getByteArray("errBytes"); + - if (errBytes0 == BYTE_ARR_NOT_READ) ++ if (!commState.lastRead()) + return false; + - errBytes = errBytes0; - + commState.idx++; + + case 1: - IgniteUuid futId0 = commState.getGridUuid(); ++ futId = commState.getGridUuid("futId"); + - if (futId0 == GRID_UUID_NOT_READ) ++ if (!commState.lastRead()) + return false; + - futId = futId0; - + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 77; + } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/util/GridLongList.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/util/GridLongList.java index 0000000,bce75f6..d167e55 mode 000000,100644..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridLongList.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridLongList.java @@@ -1,0 -1,500 +1,504 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.util; + + import org.apache.ignite.internal.util.typedef.*; + import org.apache.ignite.internal.util.typedef.internal.*; + import org.jetbrains.annotations.*; + + import java.io.*; + import java.util.*; + + /** + * Minimal list API to work with primitive longs. This list exists + * to avoid boxing/unboxing when using standard list from Java. + */ + public class GridLongList implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private long[] arr; + + /** */ + private int idx; + + /** + * + */ + public GridLongList() { + // No-op. + } + + /** + * @param size Size. + */ + public GridLongList(int size) { + arr = new long[size]; + // idx = 0 + } + + /** + * @param arr Array. + */ + public GridLongList(long[] arr) { + this.arr = arr; + + idx = arr.length; + } + + /** + * @param vals Values. + * @return List from values. + */ + public static GridLongList asList(long... vals) { + if (F.isEmpty(vals)) + return new GridLongList(); + + return new GridLongList(vals); + } + + /** + * @param arr Array. + * @param size Size. + */ + private GridLongList(long[] arr, int size) { + this.arr = arr; + idx = size; + } + + /** + * @return Copy of this list. + */ + public GridLongList copy() { + if (idx == 0) + return new GridLongList(); + + return new GridLongList(Arrays.copyOf(arr, idx)); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (!(o instanceof GridLongList)) + return false; + + GridLongList that = (GridLongList)o; + + if (idx != that.idx) + return false; + + if (idx == 0 || arr == that.arr) + return true; + + for (int i = 0; i < idx; i++) { + if (arr[i] != that.arr[i]) + return false; + } + + return true; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = 1; + + for (int i = 0; i < idx; i++) { + long element = arr[i]; + int elementHash = (int)(element ^ (element >>> 32)); + res = 31 * res + elementHash; + } + + return res; + } + + /** + * @param l List to add all elements of. + */ + public void addAll(GridLongList l) { + assert l != null; + + if (l.isEmpty()) + return; + + if (arr == null) + arr = new long[4]; + + int len = arr.length; + + while (len < idx + l.size()) + len <<= 1; + + arr = Arrays.copyOf(arr, len); + + System.arraycopy(l.arr, 0, arr, idx, l.size()); + + idx += l.size(); + } + + /** + * Add element to this array. + * @param x Value. + */ + public void add(long x) { + if (arr == null) + arr = new long[4]; + else if (arr.length == idx) + arr = Arrays.copyOf(arr, arr.length << 1); + + arr[idx++] = x; + } + + /** + * Clears the list. + */ + public void clear() { + idx = 0; + } + + /** + * Gets the last element. + * + * @return The last element. + */ + public long last() { + return arr[idx - 1]; + } + + /** + * Removes and returns the last element of the list. Complementary method to {@link #add(long)} for stack like usage. + * + * @return Removed element. + * @throws NoSuchElementException If the list is empty. + */ + public long remove() throws NoSuchElementException { + if (idx == 0) + throw new NoSuchElementException(); + + return arr[--idx]; + } + + /** + * Returns (possibly reordered) copy of this list, excluding all elements of given list. + * + * @param l List of elements to remove. + * @return New list without all elements from {@code l}. + */ + public GridLongList copyWithout(GridLongList l) { + assert l != null; + + if (idx == 0) + return new GridLongList(); + + if (l.idx == 0) + return new GridLongList(Arrays.copyOf(arr, idx)); + + long[] newArr = Arrays.copyOf(arr, idx); + int newIdx = idx; + + for (int i = 0; i < l.size(); i++) { + long rmVal = l.get(i); + + for (int j = 0; j < newIdx; j++) { + if (newArr[j] == rmVal) { + + while (newIdx > 0 && newArr[newIdx - 1] == rmVal) + newIdx--; + + if (newIdx > 0) { + newArr[j] = newArr[newIdx - 1]; + newIdx--; + } + } + } + } + + return new GridLongList(newArr, newIdx); + } + + /** + * @param i Index. + * @return Value. + */ + public long get(int i) { + assert i < idx; + + return arr[i]; + } + + /** + * @return Size. + */ + public int size() { + return idx; + } + + /** + * @return {@code True} if this list has no elements. + */ + public boolean isEmpty() { + return idx == 0; + } + + /** + * @param l Element to find. + * @return {@code True} if found. + */ + public boolean contains(long l) { + for (int i = 0; i < idx; i++) { + if (arr[i] == l) + return true; + } + + return false; + } + + /** + * @param l List to check. + * @return {@code True} if this list contains all the elements of passed in list. + */ + public boolean containsAll(GridLongList l) { + for (int i = 0; i < l.size(); i++) { + if (!contains(l.get(i))) + return false; + } + + return true; + } + + /** + * @return {@code True} if there are no duplicates. + */ + public boolean distinct() { + for (int i = 0; i < idx; i++) { + for (int j = i + 1; j < idx; j++) { + if (arr[i] == arr[j]) + return false; + } + } + + return true; + } + + /** + * @param size New size. + * @param last If {@code true} the last elements will be removed, otherwise the first. + */ + public void truncate(int size, boolean last) { + assert size >= 0 && size <= idx; + + if (size == idx) + return; + + if (!last && idx != 0 && size != 0) + System.arraycopy(arr, idx - size, arr, 0, size); + + idx = size; + } + + /** + * Removes element by given index. + * + * @param i Index. + * @return Removed value. + */ + public long removeIndex(int i) { + assert i < idx : i; + + long res = arr[i]; + + if (i == idx - 1) { // Last element. + idx = i; + } + else { + System.arraycopy(arr, i + 1, arr, i, idx - i - 1); + idx--; + } + + return res; + } + + /** + * Removes value from this list. + * + * @param startIdx Index to begin search with. + * @param val Value. + * @return Index of removed value if the value was found and removed or {@code -1} otherwise. + */ + public int removeValue(int startIdx, long val) { + assert startIdx >= 0; + + for (int i = startIdx; i < idx; i++) { + if (arr[i] == val) { + removeIndex(i); + + return i; + } + } + + return -1; + } + + /** + * Removes value from this list. + * + * @param startIdx Index to begin search with. + * @param oldVal Old value. + * @param newVal New value. + * @return Index of replaced value if the value was found and replaced or {@code -1} otherwise. + */ + public int replaceValue(int startIdx, long oldVal, long newVal) { + for (int i = startIdx; i < idx; i++) { + if (arr[i] == oldVal) { + arr[i] = newVal; + + return i; + } + } + + return -1; + } + + /** - * @return Internal array. ++ * @return Array copy. + */ - public long[] internalArray() { - return arr; ++ public long[] array() { ++ long[] res = new long[idx]; ++ ++ System.arraycopy(arr, 0, res, 0, idx); ++ ++ return res; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeInt(idx); + + for (int i = 0; i < idx; i++) + out.writeLong(arr[i]); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + idx = in.readInt(); + + arr = new long[idx]; + + for (int i = 0; i < idx; i++) + arr[i] = in.readLong(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + SB b = new SB("["); + + for (int i = 0; i < idx; i++) { + if (i != 0) + b.a(','); + + b.a(arr[i]); + } + + b.a(']'); + + return S.toString(GridLongList.class, this, "arr", b); + } + + /** + * @param in Input to read list from. + * @return Grid long list. + * @throws IOException If failed. + */ + @Nullable public static GridLongList readFrom(DataInput in) throws IOException { + int idx = in.readInt(); + + if (idx == -1) + return null; + + long[] arr = new long[idx]; + + for (int i = 0; i < idx; i++) + arr[i] = in.readLong(); + + return new GridLongList(arr); + } + + /** + * @param out Output to write to. + * @param list List. + * @throws IOException If failed. + */ + public static void writeTo(DataOutput out, @Nullable GridLongList list) throws IOException { + out.writeInt(list != null ? list.idx : -1); + + if (list != null) { + for (int i = 0; i < list.idx; i++) + out.writeLong(list.arr[i]); + } + } + + /** + * @param to To list. + * @param from From list. + * @return To list (passed in or created). + */ + public static GridLongList addAll(@Nullable GridLongList to, GridLongList from) { + if (to == null) { + GridLongList res = new GridLongList(from.size()); + + res.addAll(from); + + return res; + } + else { + to.addAll(from); + + return to; + } + } + + /** + * Sorts this list. + * Use {@code copy().sort()} if you need a defensive copy. + * + * @return {@code this} For chaining. + */ + public GridLongList sort() { + if (idx > 1) + Arrays.sort(arr, 0, idx); + + return this; + } + + /** + * Removes given number of elements from the end. If the given number of elements is higher than + * list size, then list will be cleared. + * + * @param cnt Count to pop from the end. + */ + public void pop(int cnt) { + assert cnt >= 0 : cnt; + + if (idx < cnt) + idx = 0; + else + idx -= cnt; + } + }