http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheRestResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheRestResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheRestResponse.java new file mode 100644 index 0000000..aa52f47 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheRestResponse.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.rest.handlers.cache; + +import org.apache.ignite.internal.processors.rest.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; + +/** + * Adds affinity node ID to cache responses. + */ +public class GridCacheRestResponse extends GridRestResponse { + /** */ + private static final long serialVersionUID = 0L; + + /** Affinity node ID. */ + private String affinityNodeId; + + /** + * @return Affinity node ID. + */ + public String getAffinityNodeId() { + return affinityNodeId; + } + + /** + * @param affinityNodeId Affinity node ID. + */ + public void setAffinityNodeId(String affinityNodeId) { + this.affinityNodeId = affinityNodeId; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridCacheRestResponse.class, this, super.toString()); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + super.writeExternal(out); + + U.writeString(out, affinityNodeId); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + super.readExternal(in); + + affinityNodeId = U.readString(in); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/package.html b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/package.html new file mode 100644 index 0000000..529ad66 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/package.html @@ -0,0 +1,23 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> +<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> +<html> +<body> + <!-- Package description. --> + REST in-memory data grid commands. +</body> +</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/log/GridLogCommandHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/log/GridLogCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/log/GridLogCommandHandler.java new file mode 100644 index 0000000..3c65b6d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/log/GridLogCommandHandler.java @@ -0,0 +1,307 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.rest.handlers.log; + +import org.apache.ignite.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.internal.processors.rest.*; +import org.apache.ignite.internal.processors.rest.handlers.*; +import org.apache.ignite.internal.processors.rest.request.*; +import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.internal.util.io.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; +import java.net.*; +import java.util.*; + +import static org.apache.ignite.internal.processors.rest.GridRestCommand.*; + +/** + * Handler for {@link GridRestCommand#LOG} command. + */ +public class GridLogCommandHandler extends GridRestCommandHandlerAdapter { + /** Supported commands. */ + private static final Collection<GridRestCommand> SUPPORTED_COMMANDS = U.sealList(LOG); + + /** Default log path. */ + private static final String DFLT_PATH = "work/log/gridgain.log"; + + /** Approximate line length. */ + private static final int LINE_LENGTH = 120; + + /** Folders accessible for log reading. */ + private List<File> accessibleFolders; + + /** @param ctx Context. */ + public GridLogCommandHandler(GridKernalContext ctx) { + super(ctx); + + assert ctx.config().getClientConnectionConfiguration() != null; + + String[] accessiblePaths = ctx.config().getClientConnectionConfiguration().getRestAccessibleFolders(); + + if (accessiblePaths == null) { + String ggHome = U.getGridGainHome(); + + if (ggHome != null) + accessiblePaths = new String[] {ggHome}; + } + + if (accessiblePaths != null) { + accessibleFolders = new ArrayList<>(); + + for (String accessiblePath : accessiblePaths) + accessibleFolders.add(new File(accessiblePath)); + } + else if (log.isDebugEnabled()) + log.debug("Neither restAccessibleFolders nor GRIDGAIN_HOME properties are not set, will not restrict " + + "log files access"); + } + + /** {@inheritDoc} */ + @Override public Collection<GridRestCommand> supportedCommands() { + return SUPPORTED_COMMANDS; + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<GridRestResponse> handleAsync(GridRestRequest req) { + assert req instanceof GridRestLogRequest : "Invalid command for topology handler: " + req; + + assert SUPPORTED_COMMANDS.contains(req.command()); + + GridRestLogRequest req0 = (GridRestLogRequest) req; + + String path = req0.path(); + + int from = req0.from(); + int to = req0.to(); + + if (path == null) + path = DFLT_PATH; + + try { + return new GridFinishedFuture<>(ctx, new GridRestResponse(readLog(path, from, to))); + } + catch (IgniteCheckedException e) { + return new GridFinishedFuture<>(ctx, e); + } + catch (IOException e) { + return new GridFinishedFuture<>(ctx, e); + } + } + + /** + * Reads log. + * + * @param path Path where read log located. + * @param from Number of line to start from. + * @param to Number tof line to finish on. + * @return List of read lines. + * @throws IgniteCheckedException If argumets are illegal. + * @throws IOException If file couldn't be accessed or read failed. + */ + private List<String> readLog(String path, int from, int to) throws IgniteCheckedException, IOException { + URL url = U.resolveGridGainUrl(path); + + if (url == null) + throw new IgniteCheckedException("Log file not found: " + path); + + if (!isAccessible(url)) + throw new IgniteCheckedException("File is not accessible through REST" + + " (check restAccessibleFolders configuration property): " + path); + + if (from >= 0 && to >= 0) + return readLinesForward(url, from, to); + else if (from < 0 && to < 0) + return readLinesBackward(url, from, to); + else + throw new IgniteCheckedException( + "Illegal arguments (both should be positive or negative) [from=" + from + ", to=" + to + ']'); + } + + /** + * Read lines from log backwards. + * + * @param url URL of the log. + * @param from Number of line to start from. Should be negative, representing number of line from the end. + * @param to Number tof line to finish on. Should be negative, representing number of line from the end. + * @return List of read lines. + * @throws IgniteCheckedException If arguments are illegal. + * @throws IOException If file couldn't be accessed or read failed. + */ + @SuppressWarnings("TooBroadScope") + private List<String> readLinesBackward(URL url, final int from, final int to) throws IgniteCheckedException, IOException { + File file = new File(url.getFile()); + + if (!file.exists() || !file.isFile()) + throw new IgniteCheckedException("File doesn't exists: " + url); + + int linesToRead = to - from + 1; + int linesRead = 0; + + if (linesToRead <= 0) + return Collections.emptyList(); + + LinkedList<String> lines = new LinkedList<>(); + + RandomAccessFile raf = null; + + try { + raf = new RandomAccessFile(file, "r"); + + byte[] buf = new byte[Math.min(16 * 1024, linesToRead * LINE_LENGTH)]; + + long endPos = raf.length(); + + String lastLineEnding = null; + + do { + long startPos = endPos - buf.length; + + if (startPos < 0) + startPos = 0; + + raf.seek(startPos); + + // Limiting number of bytes read to protect from line duplication near file start, + int bytesRead = raf.read(buf, 0, (int)(endPos - startPos)); + + Scanner rdr = new Scanner(new GridByteArrayInputStream(buf, 0, bytesRead)); + + // Read lines into temporary, forward ordered collection. + List<String> tmpLines = new LinkedList<>(); + + boolean firstLine = true; + + // Temporary variable to keep a new lastLineEnding value + // while old is still required. + String fst = null; + + while (rdr.hasNextLine()) { + String line = rdr.nextLine(); + + // Skip the first line as it could be incomplete. + if (firstLine) { + firstLine = false; + + // If we started from the beginning add it. + if (startPos > 0) + fst = lastLineEnding != null && !rdr.hasNextLine() ? line + lastLineEnding : line; + else + tmpLines.add(lastLineEnding != null ? line + lastLineEnding : line); + } + else if (rdr.hasNextLine()) + // If it's a last line in buffer add previously read part. + tmpLines.add(line); + else + tmpLines.add(lastLineEnding != null ? line + lastLineEnding : line); + } + + lastLineEnding = fst; + + // Limit next read to end of the first line. + endPos = startPos; + + // Save lines, if they are requested, in backward order into result collection. + for (ListIterator<String> it = tmpLines.listIterator(tmpLines.size()); it.hasPrevious(); ) { + linesRead++; + + String prev = it.previous(); + + if ((linesRead >= -to) && (linesRead <= -from)) + lines.addFirst(prev); + } + } while (linesRead < -from && endPos > 0); + } + finally { + U.close(raf, log); + } + + return lines; + } + + /** + * Reads log forward, using {@link Reader} API. + * + * @param url URL of the log file. + * @param from Number of line to start from. + * @param to Number tof line to finish on. + * @return List of read lines. + * @throws IOException If file couldn't be accessed or read failed. + */ + private List<String> readLinesForward(URL url, int from, int to) throws IOException { + BufferedReader reader = null; + + try { + reader = new BufferedReader(new InputStreamReader(url.openStream())); + + List<String> lines = new LinkedList<>(); + + String line; + + int i = 0; + + while ((line = reader.readLine()) != null) { + i++; + + if (from != -1 && i - 1 < from) + continue; + + if (to != -1 && i - 1 > to) + break; + + lines.add(line); + } + + return lines; + } + finally { + U.close(reader, log); + } + } + + /** + * Checks whether given url is accessible against configuration. + * + * @param url URL to check. + * @return {@code True} if file is accessible (i.e. located in one of the sub-folders of + * {@code restAccessibleFolders} list. + */ + private boolean isAccessible(URL url) throws IOException { + // No check is made if configuration is undefined. + if (accessibleFolders == null) + return true; + + File f = new File(url.getFile()); + + f = f.getCanonicalFile(); + + do { + if (F.contains(accessibleFolders, f)) + return true; + + f = f.getParentFile(); + } + while (f != null); + + return false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/log/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/log/package.html b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/log/package.html new file mode 100644 index 0000000..87386a0 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/log/package.html @@ -0,0 +1,23 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> +<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> +<html> +<body> +<!-- Package description. --> + REST log commands. +</body> +</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/metadata/GridPortableMetadataHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/metadata/GridPortableMetadataHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/metadata/GridPortableMetadataHandler.java new file mode 100644 index 0000000..b0a3f10 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/metadata/GridPortableMetadataHandler.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.rest.handlers.metadata; + +import org.apache.ignite.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.portables.*; +import org.apache.ignite.internal.processors.rest.*; +import org.apache.ignite.internal.processors.rest.client.message.*; +import org.apache.ignite.internal.processors.rest.handlers.*; +import org.apache.ignite.internal.processors.rest.request.*; +import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.util.*; + +import static org.apache.ignite.internal.processors.rest.GridRestCommand.*; + +/** + * Portable metadata handler. + */ +public class GridPortableMetadataHandler extends GridRestCommandHandlerAdapter { + /** Supported commands. */ + private static final Collection<GridRestCommand> SUPPORTED_COMMANDS = U.sealList( + PUT_PORTABLE_METADATA, + GET_PORTABLE_METADATA + ); + + /** + * @param ctx Context. + */ + public GridPortableMetadataHandler(GridKernalContext ctx) { + super(ctx); + } + + /** {@inheritDoc} */ + @Override public Collection<GridRestCommand> supportedCommands() { + return SUPPORTED_COMMANDS; + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<GridRestResponse> handleAsync(GridRestRequest req) { + assert SUPPORTED_COMMANDS.contains(req.command()) : req.command(); + + try { + if (req.command() == GET_PORTABLE_METADATA) { + GridRestPortableGetMetaDataRequest metaReq = (GridRestPortableGetMetaDataRequest)req; + + GridRestResponse res = new GridRestResponse(); + + Map<Integer, PortableMetadata> meta = ctx.portable().metadata(metaReq.typeIds()); + + GridClientMetaDataResponse metaRes = new GridClientMetaDataResponse(); + + metaRes.metaData(meta); + + res.setResponse(metaRes); + + return new GridFinishedFuture<>(ctx, res); + } + else { + assert req.command() == PUT_PORTABLE_METADATA; + + GridRestPortablePutMetaDataRequest metaReq = (GridRestPortablePutMetaDataRequest)req; + + for (GridClientPortableMetaData meta : metaReq.metaData()) + ctx.portable().updateMetaData(meta.typeId(), + meta.typeName(), + meta.affinityKeyFieldName(), + meta.fields()); + + GridRestResponse res = new GridRestResponse(); + + res.setResponse(true); + + return new GridFinishedFuture<>(ctx, res); + } + } + catch (IgniteException e) { + return new GridFinishedFuture<>(ctx, e); + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridPortableMetadataHandler.class, this, super.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/package.html b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/package.html new file mode 100644 index 0000000..1c2d263 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/package.html @@ -0,0 +1,23 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> +<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> +<html> +<body> + <!-- Package description. --> + REST commands handlers. +</body> +</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java new file mode 100644 index 0000000..c3efcaa --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java @@ -0,0 +1,638 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.rest.handlers.task; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.events.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.portables.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.internal.managers.communication.*; +import org.apache.ignite.internal.managers.eventstorage.*; +import org.apache.ignite.internal.processors.rest.*; +import org.apache.ignite.internal.processors.rest.client.message.*; +import org.apache.ignite.internal.processors.rest.handlers.*; +import org.apache.ignite.internal.processors.rest.request.*; +import org.apache.ignite.internal.processors.task.*; +import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; +import java.util.concurrent.locks.*; + +import static java.util.concurrent.TimeUnit.*; +import static org.apache.ignite.IgniteSystemProperties.*; +import static org.apache.ignite.events.IgniteEventType.*; +import static org.apache.ignite.internal.GridTopic.*; +import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*; +import static org.apache.ignite.internal.processors.rest.GridRestCommand.*; +import static org.jdk8.backport.ConcurrentLinkedHashMap.QueuePolicy.*; + +/** + * Command handler for API requests. + */ +public class GridTaskCommandHandler extends GridRestCommandHandlerAdapter { + /** Supported commands. */ + private static final Collection<GridRestCommand> SUPPORTED_COMMANDS = U.sealList(EXE, RESULT, NOOP); + + /** Default maximum number of task results. */ + private static final int DFLT_MAX_TASK_RESULTS = 10240; + + /** Maximum number of task results. */ + private final int maxTaskResults = getInteger(GG_REST_MAX_TASK_RESULTS, DFLT_MAX_TASK_RESULTS); + + /** Task results. */ + private final Map<IgniteUuid, TaskDescriptor> taskDescs = + new GridBoundedConcurrentLinkedHashMap<>(maxTaskResults, 16, 0.75f, 4, PER_SEGMENT_Q); + + /** Topic ID generator. */ + private final AtomicLong topicIdGen = new AtomicLong(); + + /** + * @param ctx Context. + */ + public GridTaskCommandHandler(final GridKernalContext ctx) { + super(ctx); + + ctx.io().addMessageListener(TOPIC_REST, new GridMessageListener() { + @Override public void onMessage(UUID nodeId, Object msg) { + if (!(msg instanceof GridTaskResultRequest)) { + U.warn(log, "Received unexpected message instead of task result request: " + msg); + + return; + } + + try { + GridTaskResultRequest req = (GridTaskResultRequest)msg; + + GridTaskResultResponse res = new GridTaskResultResponse(); + + IgniteUuid taskId = req.taskId(); + + TaskDescriptor desc = taskDescs.get(taskId); + + if (desc != null) { + res.found(true); + res.finished(desc.finished()); + + Throwable err = desc.error(); + + if (err != null) + res.error(err.getMessage()); + else { + res.result(desc.result()); + res.resultBytes(ctx.config().getMarshaller().marshal(desc.result())); + } + } + else + res.found(false); + + Object topic = ctx.config().getMarshaller().unmarshal(req.topicBytes(), null); + + ctx.io().send(nodeId, topic, res, SYSTEM_POOL); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send job task result response.", e); + } + } + }); + } + + /** {@inheritDoc} */ + @Override public Collection<GridRestCommand> supportedCommands() { + return SUPPORTED_COMMANDS; + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<GridRestResponse> handleAsync(GridRestRequest req) { + try { + return handleAsyncUnsafe(req); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to execute task command: " + req, e); + + return new GridFinishedFuture<>(ctx, e); + } + finally { + if (log.isDebugEnabled()) + log.debug("Handled task REST request: " + req); + } + } + + /** + * @param req Request. + * @return Future. + * @throws IgniteCheckedException On any handling exception. + */ + private IgniteFuture<GridRestResponse> handleAsyncUnsafe(final GridRestRequest req) throws IgniteCheckedException { + assert req instanceof GridRestTaskRequest : "Invalid command for topology handler: " + req; + + assert SUPPORTED_COMMANDS.contains(req.command()); + + if (log.isDebugEnabled()) + log.debug("Handling task REST request: " + req); + + GridRestTaskRequest req0 = (GridRestTaskRequest) req; + + final GridFutureAdapter<GridRestResponse> fut = new GridFutureAdapter<>(ctx); + + final GridRestResponse res = new GridRestResponse(); + + final GridClientTaskResultBean taskRestRes = new GridClientTaskResultBean(); + + // Set ID placeholder for the case it wouldn't be available due to remote execution. + taskRestRes.setId('~' + ctx.localNodeId().toString()); + + final boolean locExec = req0.destinationId() == null || req0.destinationId().equals(ctx.localNodeId()) || + ctx.discovery().node(req0.destinationId()) == null; + + switch (req.command()) { + case EXE: { + final boolean async = req0.async(); + + final String name = req0.taskName(); + + if (F.isEmpty(name)) + throw new IgniteCheckedException(missingParameter("name")); + + final List<Object> params = req0.params(); + + long timeout = req0.timeout(); + + final UUID clientId = req.clientId(); + + final ComputeTaskFuture<Object> taskFut; + + if (locExec) { + ClusterGroup prj = ctx.grid().forSubjectId(clientId); + + IgniteCompute comp = ctx.grid().compute(prj).withTimeout(timeout).enableAsync(); + + Object arg = !F.isEmpty(params) ? params.size() == 1 ? params.get(0) : params.toArray() : null; + + comp.execute(name, arg); + + taskFut = comp.future(); + } + else { + // Using predicate instead of node intentionally + // in order to provide user well-structured EmptyProjectionException. + ClusterGroup prj = ctx.grid().forPredicate(F.nodeForNodeId(req.destinationId())); + + IgniteCompute comp = ctx.grid().compute(prj).withNoFailover().enableAsync(); + + comp.call(new ExeCallable(name, params, timeout, clientId)); + + taskFut = comp.future(); + } + + if (async) { + if (locExec) { + IgniteUuid tid = taskFut.getTaskSession().getId(); + + taskDescs.put(tid, new TaskDescriptor(false, null, null)); + + taskRestRes.setId(tid.toString() + '~' + ctx.localNodeId().toString()); + + res.setResponse(taskRestRes); + } + else + res.setError("Asynchronous task execution is not supported for routing request."); + + fut.onDone(res); + } + + taskFut.listenAsync(new IgniteInClosure<IgniteFuture<Object>>() { + @Override public void apply(IgniteFuture<Object> f) { + try { + TaskDescriptor desc; + + try { + desc = new TaskDescriptor(true, f.get(), null); + } + catch (IgniteCheckedException e) { + if (e.hasCause(ClusterTopologyException.class, ClusterGroupEmptyException.class)) + U.warn(log, "Failed to execute task due to topology issues (are all mapped " + + "nodes alive?) [name=" + name + ", clientId=" + req.clientId() + + ", err=" + e + ']'); + else + U.error(log, "Failed to execute task [name=" + name + ", clientId=" + + req.clientId() + ']', e); + + desc = new TaskDescriptor(true, null, e); + } + + if (async && locExec) { + assert taskFut instanceof ComputeTaskFuture; + + IgniteUuid tid = ((ComputeTaskFuture)taskFut).getTaskSession().getId(); + + taskDescs.put(tid, desc); + } + + if (!async) { + if (desc.error() == null) { + try { + taskRestRes.setFinished(true); + taskRestRes.setResult(req.portableMode() ? + ctx.portable().marshalToPortable(desc.result()) : desc.result()); + + res.setResponse(taskRestRes); + fut.onDone(res); + } + catch (PortableException e) { + fut.onDone(new IgniteCheckedException("Failed to marshal task result: " + + desc.result(), e)); + } + } + else + fut.onDone(desc.error()); + } + } + finally { + if (!async && !fut.isDone()) + fut.onDone(new IgniteCheckedException("Failed to execute task (see server logs for details).")); + } + } + }); + + break; + } + + case RESULT: { + String id = req0.taskId(); + + if (F.isEmpty(id)) + throw new IgniteCheckedException(missingParameter("id")); + + StringTokenizer st = new StringTokenizer(id, "~"); + + if (st.countTokens() != 2) + throw new IgniteCheckedException("Failed to parse id parameter: " + id); + + String tidParam = st.nextToken(); + String resHolderIdParam = st.nextToken(); + + taskRestRes.setId(id); + + try { + IgniteUuid tid = !F.isEmpty(tidParam) ? IgniteUuid.fromString(tidParam) : null; + + UUID resHolderId = !F.isEmpty(resHolderIdParam) ? UUID.fromString(resHolderIdParam) : null; + + if (tid == null || resHolderId == null) + throw new IgniteCheckedException("Failed to parse id parameter: " + id); + + if (ctx.localNodeId().equals(resHolderId)) { + TaskDescriptor desc = taskDescs.get(tid); + + if (desc == null) + throw new IgniteCheckedException("Task with provided id has never been started on provided node" + + " [taskId=" + tidParam + ", taskResHolderId=" + resHolderIdParam + ']'); + + taskRestRes.setFinished(desc.finished()); + + if (desc.error() != null) + throw new IgniteCheckedException(desc.error().getMessage()); + + taskRestRes.setResult(desc.result()); + + res.setResponse(taskRestRes); + } + else { + IgniteBiTuple<String, GridTaskResultResponse> t = requestTaskResult(resHolderId, tid); + + if (t.get1() != null) + throw new IgniteCheckedException(t.get1()); + + GridTaskResultResponse taskRes = t.get2(); + + assert taskRes != null; + + if (!taskRes.found()) + throw new IgniteCheckedException("Task with provided id has never been started on provided node " + + "[taskId=" + tidParam + ", taskResHolderId=" + resHolderIdParam + ']'); + + taskRestRes.setFinished(taskRes.finished()); + + if (taskRes.error() != null) + throw new IgniteCheckedException(taskRes.error()); + + taskRestRes.setResult(taskRes.result()); + + res.setResponse(taskRestRes); + } + } + catch (IllegalArgumentException e) { + String msg = "Failed to parse parameters [taskId=" + tidParam + ", taskResHolderId=" + + resHolderIdParam + ", err=" + e.getMessage() + ']'; + + if (log.isDebugEnabled()) + log.debug(msg); + + throw new IgniteCheckedException(msg, e); + } + + fut.onDone(res); + + break; + } + + case NOOP: { + fut.onDone(new GridRestResponse()); + + break; + } + + default: + assert false : "Invalid command for task handler: " + req; + } + + if (log.isDebugEnabled()) + log.debug("Handled task REST request [res=" + res + ", req=" + req + ']'); + + return fut; + } + + /** + * @param resHolderId Result holder. + * @param taskId Task ID. + * @return Response from task holder. + */ + private IgniteBiTuple<String, GridTaskResultResponse> requestTaskResult(final UUID resHolderId, IgniteUuid taskId) { + ClusterNode taskNode = ctx.discovery().node(resHolderId); + + if (taskNode == null) + return F.t("Task result holder has left grid: " + resHolderId, null); + + // Tuple: error message-response. + final IgniteBiTuple<String, GridTaskResultResponse> t = F.t2(); + + final Lock lock = new ReentrantLock(); + final Condition cond = lock.newCondition(); + + GridMessageListener msgLsnr = new GridMessageListener() { + @Override public void onMessage(UUID nodeId, Object msg) { + String err = null; + GridTaskResultResponse res = null; + + if (!(msg instanceof GridTaskResultResponse)) + err = "Received unexpected message: " + msg; + else if (!nodeId.equals(resHolderId)) + err = "Received task result response from unexpected node [resHolderId=" + resHolderId + + ", nodeId=" + nodeId + ']'; + else + // Sender and message type are fine. + res = (GridTaskResultResponse)msg; + + try { + res.result(ctx.config().getMarshaller().unmarshal(res.resultBytes(), null)); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to unmarshal task result: " + res, e); + } + + lock.lock(); + + try { + if (t.isEmpty()) { + t.set(err, res); + + cond.signalAll(); + } + } + finally { + lock.unlock(); + } + } + }; + + GridLocalEventListener discoLsnr = new GridLocalEventListener() { + @Override public void onEvent(IgniteEvent evt) { + assert evt instanceof IgniteDiscoveryEvent && + (evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT) : "Unexpected event: " + evt; + + IgniteDiscoveryEvent discoEvt = (IgniteDiscoveryEvent)evt; + + if (resHolderId.equals(discoEvt.eventNode().id())) { + lock.lock(); + + try { + if (t.isEmpty()) { + t.set("Node that originated task execution has left grid: " + resHolderId, null); + + cond.signalAll(); + } + } + finally { + lock.unlock(); + } + } + } + }; + + // 1. Create unique topic name and register listener. + Object topic = TOPIC_REST.topic("task-result", topicIdGen.getAndIncrement()); + + try { + ctx.io().addMessageListener(topic, msgLsnr); + + // 2. Send message. + try { + byte[] topicBytes = ctx.config().getMarshaller().marshal(topic); + + ctx.io().send(taskNode, TOPIC_REST, new GridTaskResultRequest(taskId, topic, topicBytes), SYSTEM_POOL); + } + catch (IgniteCheckedException e) { + String errMsg = "Failed to send task result request [resHolderId=" + resHolderId + + ", err=" + e.getMessage() + ']'; + + if (log.isDebugEnabled()) + log.debug(errMsg); + + return F.t(errMsg, null); + } + + // 3. Listen to discovery events. + ctx.event().addLocalEventListener(discoLsnr, EVT_NODE_FAILED, EVT_NODE_LEFT); + + // 4. Check whether node has left before disco listener has been installed. + taskNode = ctx.discovery().node(resHolderId); + + if (taskNode == null) + return F.t("Task result holder has left grid: " + resHolderId, null); + + // 5. Wait for result. + lock.lock(); + + try { + long netTimeout = ctx.config().getNetworkTimeout(); + + if (t.isEmpty()) + cond.await(netTimeout, MILLISECONDS); + + if (t.isEmpty()) + t.set1("Timed out waiting for task result (consider increasing 'networkTimeout' " + + "configuration property) [resHolderId=" + resHolderId + ", netTimeout=" + netTimeout + ']'); + + // Return result + return t; + } + catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + + return F.t("Interrupted while waiting for task result.", null); + } + finally { + lock.unlock(); + } + } + finally { + ctx.io().removeMessageListener(topic, msgLsnr); + ctx.event().removeLocalEventListener(discoLsnr); + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridTaskCommandHandler.class, this); + } + + /** + * Immutable task execution state descriptor. + */ + private static class TaskDescriptor { + /** */ + private final boolean finished; + + /** */ + private final Object res; + + /** */ + private final Throwable err; + + /** + * @param finished Finished flag. + * @param res Result. + * @param err Error. + */ + private TaskDescriptor(boolean finished, @Nullable Object res, @Nullable Throwable err) { + this.finished = finished; + this.res = res; + this.err = err; + } + + /** + * @return {@code true} if finished. + */ + public boolean finished() { + return finished; + } + + /** + * @return Task result. + */ + @Nullable public Object result() { + return res; + } + + /** + * @return Error. + */ + @Nullable public Throwable error() { + return err; + } + } + + /** + * Callable for EXE request routing. + */ + @GridInternal + private static class ExeCallable implements Callable<Object>, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private String name; + + /** */ + private List<Object> params; + + /** */ + private long timeout; + + /** */ + private UUID clientId; + + /** */ + @IgniteInstanceResource + private GridEx g; + + /** + * Required by {@link Externalizable}. + */ + public ExeCallable() { + // No-op. + } + + /** + * @param name Name. + * @param params Params. + * @param timeout Timeout. + * @param clientId Client ID. + */ + private ExeCallable(String name, List<Object> params, long timeout, UUID clientId) { + this.name = name; + this.params = params; + this.timeout = timeout; + this.clientId = clientId; + } + + /** {@inheritDoc} */ + @Override public Object call() throws Exception { + return g.compute(g.forSubjectId(clientId)).execute( + name, + !params.isEmpty() ? params.size() == 1 ? params.get(0) : params.toArray() : null); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeString(out, name); + out.writeObject(params); + out.writeLong(timeout); + U.writeUuid(out, clientId); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + name = U.readString(in); + params = (List<Object>)in.readObject(); + timeout = in.readLong(); + clientId = U.readUuid(in); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskResultRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskResultRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskResultRequest.java new file mode 100644 index 0000000..61d2822 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskResultRequest.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.rest.handlers.task; + +import org.apache.ignite.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.internal.util.direct.*; + +import java.io.*; +import java.nio.*; + +/** + * Task result request. + */ +public class GridTaskResultRequest extends GridTcpCommunicationMessageAdapter { + /** */ + private static final long serialVersionUID = 0L; + + /** Task ID. */ + private IgniteUuid taskId; + + /** Topic. */ + @GridDirectTransient + private Object topic; + + /** Serialized topic. */ + private byte[] topicBytes; + + /** + * Public no-arg constructor for {@link Externalizable} support. + */ + public GridTaskResultRequest() { + // No-op. + } + + /** + * @param taskId Task ID. + * @param topic Topic. + * @param topicBytes Serialized topic. + */ + GridTaskResultRequest(IgniteUuid taskId, Object topic, byte[] topicBytes) { + this.taskId = taskId; + this.topic = topic; + this.topicBytes = topicBytes; + } + + /** + * @return Task ID. + */ + public IgniteUuid taskId() { + return taskId; + } + + /** + * @param taskId Task ID. + */ + public void taskId(IgniteUuid taskId) { + assert taskId != null; + + this.taskId = taskId; + } + + /** + * @return Topic. + */ + public Object topic() { + return topic; + } + + /** + * @return Serialized topic. + */ + public byte[] topicBytes() { + return topicBytes; + } + + /** + * @param topic Topic. + */ + public void topic(String topic) { + assert topic != null; + + this.topic = topic; + } + + /** {@inheritDoc} */ + @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) + @Override public GridTcpCommunicationMessageAdapter clone() { + GridTaskResultRequest _clone = new GridTaskResultRequest(); + + clone0(_clone); + + return _clone; + } + + /** {@inheritDoc} */ + @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { + GridTaskResultRequest _clone = (GridTaskResultRequest)_msg; + + _clone.taskId = taskId; + _clone.topic = topic; + _clone.topicBytes = topicBytes; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean writeTo(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!commState.typeWritten) { + if (!commState.putByte(directType())) + return false; + + commState.typeWritten = true; + } + + switch (commState.idx) { + case 0: + if (!commState.putGridUuid(taskId)) + return false; + + commState.idx++; + + case 1: + if (!commState.putByteArray(topicBytes)) + return false; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean readFrom(ByteBuffer buf) { + commState.setBuffer(buf); + + switch (commState.idx) { + case 0: + IgniteUuid taskId0 = commState.getGridUuid(); + + if (taskId0 == GRID_UUID_NOT_READ) + return false; + + taskId = taskId0; + + commState.idx++; + + case 1: + byte[] topicBytes0 = commState.getByteArray(); + + if (topicBytes0 == BYTE_ARR_NOT_READ) + return false; + + topicBytes = topicBytes0; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 73; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskResultResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskResultResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskResultResponse.java new file mode 100644 index 0000000..fb20156 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskResultResponse.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.rest.handlers.task; + +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.direct.*; +import org.jetbrains.annotations.*; + +import java.nio.*; + +/** + * Task result response. + */ +public class GridTaskResultResponse extends GridTcpCommunicationMessageAdapter { + /** */ + private static final long serialVersionUID = 0L; + + /** Result. */ + @GridDirectTransient + private Object res; + + /** Serialized result. */ + private byte[] resBytes; + + /** Finished flag. */ + private boolean finished; + + /** Flag indicating that task has ever been launched on node. */ + private boolean found; + + /** Error. */ + private String err; + + /** + * @return Task result. + */ + @Nullable public Object result() { + return res; + } + + /** + * @param res Task result. + */ + public void result(@Nullable Object res) { + this.res = res; + } + + /** + * @param resBytes Serialized result. + */ + public void resultBytes(byte[] resBytes) { + this.resBytes = resBytes; + } + + /** + * @return Serialized result. + */ + public byte[] resultBytes() { + return resBytes; + } + + /** + * @return {@code true} if finished. + */ + public boolean finished() { + return finished; + } + + /** + * @param finished {@code true} if finished. + */ + public void finished(boolean finished) { + this.finished = finished; + } + + /** + * @return {@code true} if found. + */ + public boolean found() { + return found; + } + + /** + * @param found {@code true} if found. + */ + public void found(boolean found) { + this.found = found; + } + + /** + * @return Error. + */ + public String error() { + return err; + } + + /** + * @param err Error. + */ + public void error(String err) { + this.err = err; + } + + /** {@inheritDoc} */ + @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) + @Override public GridTcpCommunicationMessageAdapter clone() { + GridTaskResultResponse _clone = new GridTaskResultResponse(); + + clone0(_clone); + + return _clone; + } + + /** {@inheritDoc} */ + @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { + GridTaskResultResponse _clone = (GridTaskResultResponse)_msg; + + _clone.res = res; + _clone.resBytes = resBytes; + _clone.finished = finished; + _clone.found = found; + _clone.err = err; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean writeTo(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!commState.typeWritten) { + if (!commState.putByte(directType())) + return false; + + commState.typeWritten = true; + } + + switch (commState.idx) { + case 0: + if (!commState.putString(err)) + return false; + + commState.idx++; + + case 1: + if (!commState.putBoolean(finished)) + return false; + + commState.idx++; + + case 2: + if (!commState.putBoolean(found)) + return false; + + commState.idx++; + + case 3: + if (!commState.putByteArray(resBytes)) + return false; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean readFrom(ByteBuffer buf) { + commState.setBuffer(buf); + + switch (commState.idx) { + case 0: + String err0 = commState.getString(); + + if (err0 == STR_NOT_READ) + return false; + + err = err0; + + commState.idx++; + + case 1: + if (buf.remaining() < 1) + return false; + + finished = commState.getBoolean(); + + commState.idx++; + + case 2: + if (buf.remaining() < 1) + return false; + + found = commState.getBoolean(); + + commState.idx++; + + case 3: + byte[] resBytes0 = commState.getByteArray(); + + if (resBytes0 == BYTE_ARR_NOT_READ) + return false; + + resBytes = resBytes0; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 74; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/top/GridTopologyCommandHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/top/GridTopologyCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/top/GridTopologyCommandHandler.java new file mode 100644 index 0000000..67929ea --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/top/GridTopologyCommandHandler.java @@ -0,0 +1,335 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.rest.handlers.top; + +import org.apache.ignite.*; +import org.apache.ignite.cache.affinity.consistenthash.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.*; +import org.gridgain.grid.kernal.processors.cache.*; +import org.apache.ignite.internal.processors.port.*; +import org.apache.ignite.internal.processors.rest.*; +import org.apache.ignite.internal.processors.rest.client.message.*; +import org.apache.ignite.internal.processors.rest.handlers.*; +import org.apache.ignite.internal.processors.rest.request.*; +import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.net.*; +import java.util.*; + +import static org.apache.ignite.internal.GridNodeAttributes.*; +import static org.apache.ignite.internal.processors.rest.GridRestCommand.*; + +/** + * Command handler for API requests. + */ +public class GridTopologyCommandHandler extends GridRestCommandHandlerAdapter { + /** Supported commands. */ + private static final Collection<GridRestCommand> SUPPORTED_COMMANDS = U.sealList(TOPOLOGY, NODE); + + /** + * @param ctx Context. + */ + public GridTopologyCommandHandler(GridKernalContext ctx) { + super(ctx); + } + + /** {@inheritDoc} */ + @Override public Collection<GridRestCommand> supportedCommands() { + return SUPPORTED_COMMANDS; + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<GridRestResponse> handleAsync(GridRestRequest req) { + assert req instanceof GridRestTopologyRequest : "Invalid command for topology handler: " + req; + + assert SUPPORTED_COMMANDS.contains(req.command()); + + if (log.isDebugEnabled()) + log.debug("Handling topology REST request: " + req); + + GridRestTopologyRequest req0 = (GridRestTopologyRequest)req; + + GridRestResponse res = new GridRestResponse(); + + boolean mtr = req0.includeMetrics(); + boolean attr = req0.includeAttributes(); + + switch (req.command()) { + case TOPOLOGY: { + Collection<ClusterNode> allNodes = F.concat(false, + ctx.discovery().allNodes(), ctx.discovery().daemonNodes()); + + Collection<GridClientNodeBean> top = + new ArrayList<>(allNodes.size()); + + for (ClusterNode node : allNodes) + top.add(createNodeBean(node, mtr, attr)); + + res.setResponse(top); + + break; + } + + case NODE: { + UUID id = req0.nodeId(); + + final String ip = req0.nodeIp(); + + if (id == null && ip == null) + return new GridFinishedFuture<>(ctx, new IgniteCheckedException( + "Failed to handle request (either id or ip should be specified).")); + + ClusterNode node; + + if (id != null) { + // Always refresh topology so client see most up-to-date view. + ctx.discovery().alive(id); + + node = ctx.grid().node(id); + + if (ip != null && node != null && !containsIp(node.addresses(), ip)) + node = null; + } + else + node = F.find(ctx.discovery().allNodes(), null, new P1<ClusterNode>() { + @Override + public boolean apply(ClusterNode n) { + return containsIp(n.addresses(), ip); + } + }); + + if (node != null) + res.setResponse(createNodeBean(node, mtr, attr)); + else + res.setResponse(null); + + break; + } + + default: + assert false : "Invalid command for topology handler: " + req; + } + + if (log.isDebugEnabled()) + log.debug("Handled topology REST request [res=" + res + ", req=" + req + ']'); + + return new GridFinishedFuture<>(ctx, res); + } + + /** + * @param addrs List of string addresses. + * @param ip Ip to match. + * @return Whether {@code ip} present in addresses. + */ + private boolean containsIp(Iterable<String> addrs, String ip) { + for (String addr : addrs) { + try { + if (InetAddress.getByName(addr).getHostAddress().equals(ip)) + return true; + } + catch (UnknownHostException ignored) { + // It's ok if we just don't know that host - node could be bound to address in another network. + } + } + + return false; + } + + /** + * Creates node bean out of grid node. Notice that cache attribute is handled separately. + * + * @param node Grid node. + * @param mtr {@code true} to add metrics. + * @param attr {@code true} to add attributes. + * @return Grid Node bean. + */ + private GridClientNodeBean createNodeBean(ClusterNode node, boolean mtr, boolean attr) { + assert node != null; + + GridClientNodeBean nodeBean = new GridClientNodeBean(); + + nodeBean.setNodeId(node.id()); + nodeBean.setConsistentId(node.consistentId()); + nodeBean.setTcpPort(attribute(node, ATTR_REST_TCP_PORT, 0)); + + nodeBean.setTcpAddresses(nonEmptyList(node.<Collection<String>>attribute(ATTR_REST_TCP_ADDRS))); + nodeBean.setTcpHostNames(nonEmptyList(node.<Collection<String>>attribute(ATTR_REST_TCP_HOST_NAMES))); + + Integer dfltReplicaCnt = node.attribute(GridCacheConsistentHashAffinityFunction.DFLT_REPLICA_COUNT_ATTR_NAME); + + if (dfltReplicaCnt == null) + dfltReplicaCnt = GridCacheConsistentHashAffinityFunction.DFLT_REPLICA_COUNT; + + nodeBean.setReplicaCount(dfltReplicaCnt); + + GridCacheAttributes[] caches = node.attribute(ATTR_CACHE); + + if (!F.isEmpty(caches)) { + Map<String, String> cacheMap = new HashMap<>(); + + for (GridCacheAttributes cacheAttr : caches) { + if (ctx.cache().systemCache(cacheAttr.cacheName())) + continue; + + if (cacheAttr.cacheName() != null) + cacheMap.put(cacheAttr.cacheName(), cacheAttr.cacheMode().toString()); + else + nodeBean.setDefaultCacheMode(cacheAttr.cacheMode().toString()); + } + + nodeBean.setCaches(cacheMap); + } + + if (mtr) { + ClusterNodeMetrics metrics = node.metrics(); + + GridClientNodeMetricsBean metricsBean = new GridClientNodeMetricsBean(); + + metricsBean.setStartTime(metrics.getStartTime()); + metricsBean.setAverageActiveJobs(metrics.getAverageActiveJobs()); + metricsBean.setAverageCancelledJobs(metrics.getAverageCancelledJobs()); + metricsBean.setAverageCpuLoad(metrics.getAverageCpuLoad()); + metricsBean.setAverageJobExecuteTime(metrics.getAverageJobExecuteTime()); + metricsBean.setAverageJobWaitTime(metrics.getAverageJobWaitTime()); + metricsBean.setAverageRejectedJobs(metrics.getAverageRejectedJobs()); + metricsBean.setAverageWaitingJobs(metrics.getAverageWaitingJobs()); + metricsBean.setCurrentActiveJobs(metrics.getCurrentActiveJobs()); + metricsBean.setCurrentCancelledJobs(metrics.getCurrentCancelledJobs()); + metricsBean.setCurrentCpuLoad(metrics.getCurrentCpuLoad()); + metricsBean.setCurrentGcCpuLoad(metrics.getCurrentGcCpuLoad()); + metricsBean.setCurrentDaemonThreadCount(metrics.getCurrentDaemonThreadCount()); + metricsBean.setCurrentIdleTime(metrics.getCurrentIdleTime()); + metricsBean.setCurrentJobExecuteTime(metrics.getCurrentJobExecuteTime()); + metricsBean.setCurrentJobWaitTime(metrics.getCurrentJobWaitTime()); + metricsBean.setCurrentRejectedJobs(metrics.getCurrentRejectedJobs()); + metricsBean.setCurrentThreadCount(metrics.getCurrentThreadCount()); + metricsBean.setCurrentWaitingJobs(metrics.getCurrentWaitingJobs()); + metricsBean.setHeapMemoryCommitted(metrics.getHeapMemoryCommitted()); + metricsBean.setHeapMemoryInitialized(metrics.getHeapMemoryInitialized()); + metricsBean.setHeapMemoryMaximum(metrics.getHeapMemoryMaximum()); + metricsBean.setHeapMemoryUsed(metrics.getHeapMemoryUsed()); + metricsBean.setLastDataVersion(metrics.getLastDataVersion()); + metricsBean.setLastUpdateTime(metrics.getLastUpdateTime()); + metricsBean.setMaximumActiveJobs(metrics.getMaximumActiveJobs()); + metricsBean.setMaximumCancelledJobs(metrics.getMaximumCancelledJobs()); + metricsBean.setMaximumJobExecuteTime(metrics.getMaximumJobExecuteTime()); + metricsBean.setMaximumJobWaitTime(metrics.getMaximumJobWaitTime()); + metricsBean.setMaximumRejectedJobs(metrics.getMaximumRejectedJobs()); + metricsBean.setMaximumThreadCount(metrics.getMaximumThreadCount()); + metricsBean.setMaximumWaitingJobs(metrics.getMaximumWaitingJobs()); + metricsBean.setNodeStartTime(metrics.getNodeStartTime()); + metricsBean.setNonHeapMemoryCommitted(metrics.getNonHeapMemoryCommitted()); + metricsBean.setNonHeapMemoryInitialized(metrics.getNonHeapMemoryInitialized()); + metricsBean.setNonHeapMemoryMaximum(metrics.getNonHeapMemoryMaximum()); + metricsBean.setNonHeapMemoryUsed(metrics.getNonHeapMemoryUsed()); + metricsBean.setStartTime(metrics.getStartTime()); + metricsBean.setTotalCancelledJobs(metrics.getTotalCancelledJobs()); + metricsBean.setTotalCpus(metrics.getTotalCpus()); + metricsBean.setTotalExecutedJobs(metrics.getTotalExecutedJobs()); + metricsBean.setTotalIdleTime(metrics.getTotalIdleTime()); + metricsBean.setTotalRejectedJobs(metrics.getTotalRejectedJobs()); + metricsBean.setTotalStartedThreadCount(metrics.getTotalStartedThreadCount()); + metricsBean.setTotalExecutedTasks(metrics.getTotalExecutedTasks()); + metricsBean.setSentMessagesCount(metrics.getSentMessagesCount()); + metricsBean.setSentBytesCount(metrics.getSentBytesCount()); + metricsBean.setReceivedMessagesCount(metrics.getReceivedMessagesCount()); + metricsBean.setReceivedBytesCount(metrics.getReceivedBytesCount()); + metricsBean.setUpTime(metrics.getUpTime()); + + nodeBean.setMetrics(metricsBean); + } + + if (attr) { + Map<String, Object> attrs = new HashMap<>(node.attributes()); + + attrs.remove(ATTR_CACHE); + attrs.remove(ATTR_TX_CONFIG); + attrs.remove(ATTR_SECURITY_SUBJECT); + attrs.remove(ATTR_SECURITY_CREDENTIALS); + + for (Iterator<Map.Entry<String, Object>> i = attrs.entrySet().iterator(); i.hasNext();) { + Map.Entry<String, Object> e = i.next(); + + if (!e.getKey().startsWith("org.gridgain.") && System.getProperty(e.getKey()) == null) { + i.remove(); + + continue; + } + + if (e.getValue() != null) { + if (e.getValue().getClass().isEnum() || e.getValue() instanceof InetAddress) + e.setValue(e.getValue().toString()); + else if (e.getValue().getClass().isArray()) + i.remove(); + } + } + + nodeBean.setAttributes(attrs); + } + + return nodeBean; + } + + /** + * @param col Collection; + * @return Non-empty list. + */ + private static Collection<String> nonEmptyList(Collection<String> col) { + return col == null ? Collections.<String>emptyList() : col; + } + + /** + * Get node attribute by specified attribute name. + * + * @param node Node to get attribute for. + * @param attrName Attribute name. + * @param dfltVal Default result for case when node attribute resolved into {@code null}. + * @return Attribute value or default result if requested attribute resolved into {@code null}. + */ + private <T> T attribute(ClusterNode node, String attrName, T dfltVal) { + T attr = node.attribute(attrName); + + return attr == null ? dfltVal : attr; + } + + /** + * Get registered port + * + * @param protoCls Protocol class. + * @param def Default value if such class is not registered. + * @return Registered port for the protocol class or {@code def}ault value if such class is not registered. + */ + private int getRegisteredPort(Class<? extends GridRestProtocol> protoCls, int def) { + for (GridPortRecord r : ctx.ports().records()) { + if (r.protocol() == IgnitePortProtocol.TCP && protoCls.isAssignableFrom(r.clazz())) + return r.port(); + } + + return def; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridTopologyCommandHandler.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/top/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/top/package.html b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/top/package.html new file mode 100644 index 0000000..b9da8c5 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/top/package.html @@ -0,0 +1,23 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> +<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> +<html> +<body> + <!-- Package description. --> + REST topology commands. +</body> +</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/version/GridVersionCommandHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/version/GridVersionCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/version/GridVersionCommandHandler.java new file mode 100644 index 0000000..d1f71cb --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/version/GridVersionCommandHandler.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.rest.handlers.version; + +import org.apache.ignite.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.internal.processors.rest.*; +import org.apache.ignite.internal.processors.rest.handlers.*; +import org.apache.ignite.internal.processors.rest.request.*; +import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.util.*; + +import static org.apache.ignite.internal.GridProductImpl.*; +import static org.apache.ignite.internal.processors.rest.GridRestCommand.*; + +/** + * Handler for {@link GridRestCommand#VERSION} command. + */ +public class GridVersionCommandHandler extends GridRestCommandHandlerAdapter { + /** Supported commands. */ + private static final Collection<GridRestCommand> SUPPORTED_COMMANDS = U.sealList(VERSION); + + /** + * @param ctx Context. + */ + public GridVersionCommandHandler(GridKernalContext ctx) { + super(ctx); + } + + /** {@inheritDoc} */ + @Override public Collection<GridRestCommand> supportedCommands() { + return SUPPORTED_COMMANDS; + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<GridRestResponse> handleAsync(GridRestRequest req) { + assert req != null; + + assert SUPPORTED_COMMANDS.contains(req.command()); + + return new GridFinishedFuture<>(ctx, new GridRestResponse(COMPOUND_VER)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/version/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/version/package.html b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/version/package.html new file mode 100644 index 0000000..d0c805f --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/version/package.html @@ -0,0 +1,23 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> +<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> +<html> +<body> +<!-- Package description. --> + REST version command. +</body> +</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/package.html b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/package.html new file mode 100644 index 0000000..9d1f975 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/package.html @@ -0,0 +1,23 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> +<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> +<html> +<body> + <!-- Package description. --> + REST processor. +</body> +</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/GridRestProtocolAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/GridRestProtocolAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/GridRestProtocolAdapter.java new file mode 100644 index 0000000..1f44116 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/GridRestProtocolAdapter.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.rest.protocols; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.internal.processors.rest.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; +import sun.misc.*; + +import java.io.*; +import java.net.*; +import java.nio.charset.*; +import java.security.*; +import java.util.*; + +/** + * Abstract protocol adapter. + */ +public abstract class GridRestProtocolAdapter implements GridRestProtocol { + /** UTF-8 charset. */ + private static final Charset UTF_8 = Charset.forName("UTF-8"); + + /** Context. */ + protected final GridKernalContext ctx; + + /** Logger. */ + protected final IgniteLogger log; + + /** Secret key. */ + protected final String secretKey; + + /** Host used by this protocol. */ + protected InetAddress host; + + /** Port used by this protocol. */ + protected int port; + + /** + * @param ctx Context. + */ + @SuppressWarnings({"OverriddenMethodCallDuringObjectConstruction"}) + protected GridRestProtocolAdapter(GridKernalContext ctx) { + assert ctx != null; + assert ctx.config().getClientConnectionConfiguration() != null; + + this.ctx = ctx; + + log = ctx.log(getClass()); + + secretKey = ctx.config().getClientConnectionConfiguration().getRestSecretKey(); + } + + /** + * Authenticates current request. + * <p> + * Token consists of 2 parts separated by semicolon: + * <ol> + * <li>Timestamp (time in milliseconds)</li> + * <li>Base64 encoded SHA1 hash of {1}:{secretKey}</li> + * </ol> + * + * @param tok Authentication token. + * @return {@code true} if authentication info provided in request is correct. + */ + protected boolean authenticate(@Nullable String tok) { + if (F.isEmpty(secretKey)) + return true; + + if (F.isEmpty(tok)) + return false; + + StringTokenizer st = new StringTokenizer(tok, ":"); + + if (st.countTokens() != 2) + return false; + + String ts = st.nextToken(); + String hash = st.nextToken(); + + String s = ts + ':' + secretKey; + + try { + MessageDigest md = MessageDigest.getInstance("SHA-1"); + + BASE64Encoder enc = new BASE64Encoder(); + + md.update(s.getBytes(UTF_8)); + + String compHash = enc.encode(md.digest()); + + return hash.equalsIgnoreCase(compHash); + } + catch (NoSuchAlgorithmException e) { + U.error(log, "Failed to check authentication signature.", e); + } + + return false; + } + + /** + * @return Start information string. + */ + protected String startInfo() { + return "Command protocol successfully started [name=" + name() + ", host=" + host + ", port=" + port + ']'; + } + + /** + * @return Stop information string. + */ + protected String stopInfo() { + return "Command protocol successfully stopped: " + name(); + } + + /** + * @param cond Condition to check. + * @param condDesc Error message. + * @throws IgniteCheckedException If check failed. + */ + protected final void assertParameter(boolean cond, String condDesc) throws IgniteCheckedException { + if (!cond) + throw new IgniteCheckedException("REST protocol parameter failed condition check: " + condDesc); + } + + /** + * @return Client configuration. + */ + protected ClientConnectionConfiguration config() { + return ctx.config().getClientConnectionConfiguration(); + } + + /** {@inheritDoc} */ + @Override public Collection<IgniteBiTuple<String, Object>> getProperties() { + try { + // All addresses for wildcard endpoint, `null` without. + IgniteBiTuple<Collection<String>, Collection<String>> addrs = host != null ? + U.resolveLocalAddresses(host) : null; + + return port > 0 ? + Arrays.asList( + F.<String, Object>t(getAddressPropertyName(), addrs.get1()), + F.<String, Object>t(getHostNamePropertyName(), addrs.get2()), + F.<String, Object>t(getPortPropertyName(), port) + ) : + Collections.<IgniteBiTuple<String, Object>>emptyList(); + } + catch (IgniteCheckedException | IOException ignored) { + return null; + } + } + + /** + * Return node attribute name to store used address. + * + * @return Node attribute name. + */ + protected abstract String getAddressPropertyName(); + + /** + * Return node attribute name to store used host name. + * + * @return Node attribute name. + */ + protected abstract String getHostNamePropertyName(); + + /** + * Return node attribute name to store used port number. + * + * @return Node attribute name. + */ + protected abstract String getPortPropertyName(); + + /** {@inheritDoc} */ + @Override public void onKernalStart() { + // No-op. + } +}
