http://git-wip-us.apache.org/repos/asf/ignite/blob/facedf50/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopPrepareForJobRequest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopPrepareForJobRequest.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopPrepareForJobRequest.java deleted file mode 100644 index 4aa1596..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopPrepareForJobRequest.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external; - -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import org.apache.ignite.internal.processors.hadoop.HadoopJobId; -import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo; -import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage; -import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; - -/** - * Child process initialization request. - */ -public class HadoopPrepareForJobRequest implements HadoopMessage { - /** */ - private static final long serialVersionUID = 0L; - - /** Job ID. */ - @GridToStringInclude - private HadoopJobId jobId; - - /** Job info. */ - @GridToStringInclude - private HadoopJobInfo jobInfo; - - /** Total amount of reducers in the job. */ - @GridToStringInclude - private int totalReducersCnt; - - /** Reducers to be executed on current node. */ - @GridToStringInclude - private int[] locReducers; - - /** - * Constructor required by {@link Externalizable}. - */ - public HadoopPrepareForJobRequest() { - // No-op. - } - - /** - * @param jobId Job ID. - * @param jobInfo Job info. - * @param totalReducersCnt Number of reducers in the job. - * @param locReducers Reducers to be executed on current node. - */ - public HadoopPrepareForJobRequest(HadoopJobId jobId, HadoopJobInfo jobInfo, int totalReducersCnt, - int[] locReducers) { - assert jobId != null; - - this.jobId = jobId; - this.jobInfo = jobInfo; - this.totalReducersCnt = totalReducersCnt; - this.locReducers = locReducers; - } - - /** - * @return Job info. - */ - public HadoopJobInfo jobInfo() { - return jobInfo; - } - - /** - * @return Job ID. - */ - public HadoopJobId jobId() { - return jobId; - } - - /** - * @return Reducers to be executed on current node. - */ - public int[] localReducers() { - return locReducers; - } - - /** - * @return Number of reducers in job. - */ - public int totalReducerCount() { - return totalReducersCnt; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - jobId.writeExternal(out); - - out.writeObject(jobInfo); - out.writeInt(totalReducersCnt); - - U.writeIntArray(out, locReducers); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - jobId = new HadoopJobId(); - jobId.readExternal(in); - - jobInfo = (HadoopJobInfo)in.readObject(); - totalReducersCnt = in.readInt(); - - locReducers = U.readIntArray(in); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(HadoopPrepareForJobRequest.class, this); - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/facedf50/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopProcessDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopProcessDescriptor.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopProcessDescriptor.java deleted file mode 100644 index 388c7b4..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopProcessDescriptor.java +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external; - -import java.io.Serializable; -import java.util.UUID; -import org.apache.ignite.internal.util.typedef.internal.S; - -/** - * Process descriptor used to identify process for which task is running. - */ -public class HadoopProcessDescriptor implements Serializable { - /** */ - private static final long serialVersionUID = 0L; - - /** Parent node ID. */ - private UUID parentNodeId; - - /** Process ID. */ - private UUID procId; - - /** Address. */ - private String addr; - - /** TCP port. */ - private int tcpPort; - - /** Shared memory port. */ - private int shmemPort; - - /** - * @param parentNodeId Parent node ID. - * @param procId Process ID. - */ - public HadoopProcessDescriptor(UUID parentNodeId, UUID procId) { - this.parentNodeId = parentNodeId; - this.procId = procId; - } - - /** - * Gets process ID. - * - * @return Process ID. - */ - public UUID processId() { - return procId; - } - - /** - * Gets parent node ID. - * - * @return Parent node ID. - */ - public UUID parentNodeId() { - return parentNodeId; - } - - /** - * Gets host address. - * - * @return Host address. - */ - public String address() { - return addr; - } - - /** - * Sets host address. - * - * @param addr Host address. - */ - public void address(String addr) { - this.addr = addr; - } - - /** - * @return Shared memory port. - */ - public int sharedMemoryPort() { - return shmemPort; - } - - /** - * Sets shared memory port. - * - * @param shmemPort Shared memory port. - */ - public void sharedMemoryPort(int shmemPort) { - this.shmemPort = shmemPort; - } - - /** - * @return TCP port. - */ - public int tcpPort() { - return tcpPort; - } - - /** - * Sets TCP port. - * - * @param tcpPort TCP port. - */ - public void tcpPort(int tcpPort) { - this.tcpPort = tcpPort; - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - - if (!(o instanceof HadoopProcessDescriptor)) - return false; - - HadoopProcessDescriptor that = (HadoopProcessDescriptor)o; - - return parentNodeId.equals(that.parentNodeId) && procId.equals(that.procId); - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - int result = parentNodeId.hashCode(); - - result = 31 * result + procId.hashCode(); - - return result; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(HadoopProcessDescriptor.class, this); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/facedf50/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopProcessStartedAck.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopProcessStartedAck.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopProcessStartedAck.java deleted file mode 100644 index 81c062e..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopProcessStartedAck.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external; - -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage; -import org.apache.ignite.internal.util.typedef.internal.S; - -/** - * Process started message. - */ -public class HadoopProcessStartedAck implements HadoopMessage { - /** */ - private static final long serialVersionUID = 0L; - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - // No-op. - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(HadoopProcessStartedAck.class, this); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/facedf50/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopTaskExecutionRequest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopTaskExecutionRequest.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopTaskExecutionRequest.java deleted file mode 100644 index f0a8139..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopTaskExecutionRequest.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external; - -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.util.Collection; -import org.apache.ignite.internal.processors.hadoop.HadoopJobId; -import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; -import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage; -import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; - -/** - * Message sent from node to child process to start task(s) execution. - */ -public class HadoopTaskExecutionRequest implements HadoopMessage { - /** */ - private static final long serialVersionUID = 0L; - - /** Job ID. */ - @GridToStringInclude - private HadoopJobId jobId; - - /** Job info. */ - @GridToStringInclude - private HadoopJobInfo jobInfo; - - /** Mappers. */ - @GridToStringInclude - private Collection<HadoopTaskInfo> tasks; - - /** - * @return Job ID. - */ - public HadoopJobId jobId() { - return jobId; - } - - /** - * @param jobId Job ID. - */ - public void jobId(HadoopJobId jobId) { - this.jobId = jobId; - } - - /** - * @return Jon info. - */ - public HadoopJobInfo jobInfo() { - return jobInfo; - } - - /** - * @param jobInfo Job info. - */ - public void jobInfo(HadoopJobInfo jobInfo) { - this.jobInfo = jobInfo; - } - - /** - * @return Tasks. - */ - public Collection<HadoopTaskInfo> tasks() { - return tasks; - } - - /** - * @param tasks Tasks. - */ - public void tasks(Collection<HadoopTaskInfo> tasks) { - this.tasks = tasks; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(HadoopTaskExecutionRequest.class, this); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - jobId.writeExternal(out); - - out.writeObject(jobInfo); - U.writeCollection(out, tasks); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - jobId = new HadoopJobId(); - jobId.readExternal(in); - - jobInfo = (HadoopJobInfo)in.readObject(); - tasks = U.readCollection(in); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/facedf50/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopTaskFinishedMessage.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopTaskFinishedMessage.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopTaskFinishedMessage.java deleted file mode 100644 index a4a2a8d..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopTaskFinishedMessage.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external; - -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; -import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage; -import org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.HadoopTaskStatus; -import org.apache.ignite.internal.util.typedef.internal.S; - -/** - * Task finished message. Sent when local task finishes execution. - */ -public class HadoopTaskFinishedMessage implements HadoopMessage { - /** */ - private static final long serialVersionUID = 0L; - - /** Finished task info. */ - private HadoopTaskInfo taskInfo; - - /** Task finish status. */ - private HadoopTaskStatus status; - - /** - * Constructor required by {@link Externalizable}. - */ - public HadoopTaskFinishedMessage() { - // No-op. - } - - /** - * @param taskInfo Finished task info. - * @param status Task finish status. - */ - public HadoopTaskFinishedMessage(HadoopTaskInfo taskInfo, HadoopTaskStatus status) { - assert taskInfo != null; - assert status != null; - - this.taskInfo = taskInfo; - this.status = status; - } - - /** - * @return Finished task info. - */ - public HadoopTaskInfo taskInfo() { - return taskInfo; - } - - /** - * @return Task finish status. - */ - public HadoopTaskStatus status() { - return status; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(HadoopTaskFinishedMessage.class, this); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - taskInfo.writeExternal(out); - status.writeExternal(out); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - taskInfo = new HadoopTaskInfo(); - taskInfo.readExternal(in); - - status = new HadoopTaskStatus(); - status.readExternal(in); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/facedf50/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/child/HadoopChildProcessRunner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/child/HadoopChildProcessRunner.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/child/HadoopChildProcessRunner.java deleted file mode 100644 index 96c3ec6..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/child/HadoopChildProcessRunner.java +++ /dev/null @@ -1,460 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external.child; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteLogger; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.processors.hadoop.HadoopHelperImpl; -import org.apache.ignite.internal.processors.hadoop.HadoopJob; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskOutput; -import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage; -import org.apache.ignite.internal.processors.hadoop.impl.shuffle.HadoopShuffleAck; -import org.apache.ignite.internal.processors.hadoop.impl.shuffle.HadoopShuffleJob; -import org.apache.ignite.internal.processors.hadoop.impl.shuffle.HadoopShuffleMessage; -import org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.HadoopExecutorService; -import org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.HadoopRunnableTask; -import org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.HadoopTaskState; -import org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.HadoopTaskStatus; -import org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external.HadoopJobInfoUpdateRequest; -import org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external.HadoopPrepareForJobRequest; -import org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external.HadoopProcessDescriptor; -import org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external.HadoopProcessStartedAck; -import org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external.HadoopTaskExecutionRequest; -import org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external.HadoopTaskFinishedMessage; -import org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external.communication.HadoopExternalCommunication; -import org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external.communication.HadoopMessageListener; -import org.apache.ignite.internal.processors.hadoop.impl.v2.HadoopV2Job; -import org.apache.ignite.internal.util.future.GridFutureAdapter; -import org.apache.ignite.internal.util.lang.IgniteInClosure2X; -import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory; -import org.apache.ignite.internal.util.typedef.CI1; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.U; - -import static org.apache.ignite.internal.processors.hadoop.HadoopTaskType.MAP; -import static org.apache.ignite.internal.processors.hadoop.HadoopTaskType.REDUCE; - -/** - * Hadoop process base. - */ -@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") -public class HadoopChildProcessRunner { - /** Node process descriptor. */ - private HadoopProcessDescriptor nodeDesc; - - /** Message processing executor service. */ - private ExecutorService msgExecSvc; - - /** Task executor service. */ - private HadoopExecutorService execSvc; - - /** */ - protected GridUnsafeMemory mem = new GridUnsafeMemory(0); - - /** External communication. */ - private HadoopExternalCommunication comm; - - /** Logger. */ - private IgniteLogger log; - - /** Init guard. */ - private final AtomicBoolean initGuard = new AtomicBoolean(); - - /** Start time. */ - private long startTime; - - /** Init future. */ - private final GridFutureAdapter<?> initFut = new GridFutureAdapter<>(); - - /** Job instance. */ - private HadoopJob job; - - /** Number of uncompleted tasks. */ - private final AtomicInteger pendingTasks = new AtomicInteger(); - - /** Shuffle job. */ - private HadoopShuffleJob<HadoopProcessDescriptor> shuffleJob; - - /** Concurrent mappers. */ - private int concMappers; - - /** Concurrent reducers. */ - private int concReducers; - - /** - * Starts child process runner. - */ - public void start(HadoopExternalCommunication comm, HadoopProcessDescriptor nodeDesc, - ExecutorService msgExecSvc, IgniteLogger parentLog) - throws IgniteCheckedException { - this.comm = comm; - this.nodeDesc = nodeDesc; - this.msgExecSvc = msgExecSvc; - - comm.setListener(new MessageListener()); - log = parentLog.getLogger(HadoopChildProcessRunner.class); - - startTime = U.currentTimeMillis(); - - // At this point node knows that this process has started. - comm.sendMessage(this.nodeDesc, new HadoopProcessStartedAck()); - } - - /** - * Initializes process for task execution. - * - * @param req Initialization request. - */ - private void prepareProcess(HadoopPrepareForJobRequest req) { - if (initGuard.compareAndSet(false, true)) { - try { - if (log.isDebugEnabled()) - log.debug("Initializing external hadoop task: " + req); - - assert job == null; - - job = req.jobInfo().createJob(HadoopV2Job.class, req.jobId(), log, null, new HadoopHelperImpl()); - - job.initialize(true, nodeDesc.processId()); - - shuffleJob = new HadoopShuffleJob<>(comm.localProcessDescriptor(), log, job, mem, - req.totalReducerCount(), req.localReducers()); - - initializeExecutors(req); - - if (log.isDebugEnabled()) - log.debug("External process initialized [initWaitTime=" + - (U.currentTimeMillis() - startTime) + ']'); - - initFut.onDone(); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to initialize process: " + req, e); - - initFut.onDone(e); - } - } - else - log.warning("Duplicate initialize process request received (will ignore): " + req); - } - - /** - * @param req Task execution request. - */ - private void runTasks(final HadoopTaskExecutionRequest req) { - if (!initFut.isDone() && log.isDebugEnabled()) - log.debug("Will wait for process initialization future completion: " + req); - - initFut.listen(new CI1<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> f) { - try { - // Make sure init was successful. - f.get(); - - boolean set = pendingTasks.compareAndSet(0, req.tasks().size()); - - assert set; - - HadoopTaskInfo info = F.first(req.tasks()); - - assert info != null; - - int size = info.type() == MAP ? concMappers : concReducers; - -// execSvc.setCorePoolSize(size); -// execSvc.setMaximumPoolSize(size); - - if (log.isDebugEnabled()) - log.debug("Set executor service size for task type [type=" + info.type() + - ", size=" + size + ']'); - - for (HadoopTaskInfo taskInfo : req.tasks()) { - if (log.isDebugEnabled()) - log.debug("Submitted task for external execution: " + taskInfo); - - execSvc.submit(new HadoopRunnableTask(log, job, mem, taskInfo, nodeDesc.parentNodeId()) { - @Override protected void onTaskFinished(HadoopTaskStatus status) { - onTaskFinished0(this, status); - } - - @Override protected HadoopTaskInput createInput(HadoopTaskContext ctx) - throws IgniteCheckedException { - return shuffleJob.input(ctx); - } - - @Override protected HadoopTaskOutput createOutput(HadoopTaskContext ctx) - throws IgniteCheckedException { - return shuffleJob.output(ctx); - } - }); - } - } - catch (IgniteCheckedException e) { - for (HadoopTaskInfo info : req.tasks()) - notifyTaskFinished(info, new HadoopTaskStatus(HadoopTaskState.FAILED, e), false); - } - } - }); - } - - /** - * Creates executor services. - * - * @param req Init child process request. - */ - private void initializeExecutors(HadoopPrepareForJobRequest req) { - int cpus = Runtime.getRuntime().availableProcessors(); -// -// concMappers = get(req.jobInfo(), EXTERNAL_CONCURRENT_MAPPERS, cpus); -// concReducers = get(req.jobInfo(), EXTERNAL_CONCURRENT_REDUCERS, cpus); - - execSvc = new HadoopExecutorService(log, "", cpus * 2, 1024); - } - - /** - * Updates external process map so that shuffle can proceed with sending messages to reducers. - * - * @param req Update request. - */ - private void updateTasks(final HadoopJobInfoUpdateRequest req) { - initFut.listen(new CI1<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> gridFut) { - assert initGuard.get(); - - assert req.jobId().equals(job.id()); - - if (req.reducersAddresses() != null) { - if (shuffleJob.initializeReduceAddresses(req.reducersAddresses())) { - shuffleJob.startSending("external", - new IgniteInClosure2X<HadoopProcessDescriptor, HadoopShuffleMessage>() { - @Override public void applyx(HadoopProcessDescriptor dest, - HadoopShuffleMessage msg) throws IgniteCheckedException { - comm.sendMessage(dest, msg); - } - }); - } - } - } - }); - } - - /** - * Stops all executors and running tasks. - */ - private void shutdown() { - if (execSvc != null) - execSvc.shutdown(5000); - - if (msgExecSvc != null) - msgExecSvc.shutdownNow(); - - try { - job.dispose(true); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to dispose job.", e); - } - } - - /** - * Notifies node about task finish. - * - * @param run Finished task runnable. - * @param status Task status. - */ - private void onTaskFinished0(HadoopRunnableTask run, HadoopTaskStatus status) { - HadoopTaskInfo info = run.taskInfo(); - - int pendingTasks0 = pendingTasks.decrementAndGet(); - - if (log.isDebugEnabled()) - log.debug("Hadoop task execution finished [info=" + info - + ", state=" + status.state() + ", waitTime=" + run.waitTime() + ", execTime=" + run.executionTime() + - ", pendingTasks=" + pendingTasks0 + - ", err=" + status.failCause() + ']'); - - assert info.type() == MAP || info.type() == REDUCE : "Only MAP or REDUCE tasks are supported."; - - boolean flush = pendingTasks0 == 0 && info.type() == MAP; - - notifyTaskFinished(info, status, flush); - } - - /** - * @param taskInfo Finished task info. - * @param status Task status. - */ - private void notifyTaskFinished(final HadoopTaskInfo taskInfo, final HadoopTaskStatus status, - boolean flush) { - - final HadoopTaskState state = status.state(); - final Throwable err = status.failCause(); - - if (!flush) { - try { - if (log.isDebugEnabled()) - log.debug("Sending notification to parent node [taskInfo=" + taskInfo + ", state=" + state + - ", err=" + err + ']'); - - comm.sendMessage(nodeDesc, new HadoopTaskFinishedMessage(taskInfo, status)); - } - catch (IgniteCheckedException e) { - log.error("Failed to send message to parent node (will terminate child process).", e); - - shutdown(); - - terminate(); - } - } - else { - if (log.isDebugEnabled()) - log.debug("Flushing shuffle messages before sending last task completion notification [taskInfo=" + - taskInfo + ", state=" + state + ", err=" + err + ']'); - - final long start = U.currentTimeMillis(); - - try { - shuffleJob.flush().listen(new CI1<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> f) { - long end = U.currentTimeMillis(); - - if (log.isDebugEnabled()) - log.debug("Finished flushing shuffle messages [taskInfo=" + taskInfo + - ", flushTime=" + (end - start) + ']'); - - try { - // Check for errors on shuffle. - f.get(); - - notifyTaskFinished(taskInfo, status, false); - } - catch (IgniteCheckedException e) { - log.error("Failed to flush shuffle messages (will fail the task) [taskInfo=" + taskInfo + - ", state=" + state + ", err=" + err + ']', e); - - notifyTaskFinished(taskInfo, - new HadoopTaskStatus(HadoopTaskState.FAILED, e), false); - } - } - }); - } - catch (IgniteCheckedException e) { - log.error("Failed to flush shuffle messages (will fail the task) [taskInfo=" + taskInfo + - ", state=" + state + ", err=" + err + ']', e); - - notifyTaskFinished(taskInfo, new HadoopTaskStatus(HadoopTaskState.FAILED, e), false); - } - } - } - - /** - * Checks if message was received from parent node and prints warning if not. - * - * @param desc Sender process ID. - * @param msg Received message. - * @return {@code True} if received from parent node. - */ - private boolean validateNodeMessage(HadoopProcessDescriptor desc, HadoopMessage msg) { - if (!nodeDesc.processId().equals(desc.processId())) { - log.warning("Received process control request from unknown process (will ignore) [desc=" + desc + - ", msg=" + msg + ']'); - - return false; - } - - return true; - } - - /** - * Stops execution of this process. - */ - private void terminate() { - System.exit(1); - } - - /** - * Message listener. - */ - private class MessageListener implements HadoopMessageListener { - /** {@inheritDoc} */ - @Override public void onMessageReceived(final HadoopProcessDescriptor desc, final HadoopMessage msg) { - if (msg instanceof HadoopTaskExecutionRequest) { - if (validateNodeMessage(desc, msg)) - runTasks((HadoopTaskExecutionRequest)msg); - } - else if (msg instanceof HadoopJobInfoUpdateRequest) { - if (validateNodeMessage(desc, msg)) - updateTasks((HadoopJobInfoUpdateRequest)msg); - } - else if (msg instanceof HadoopPrepareForJobRequest) { - if (validateNodeMessage(desc, msg)) - prepareProcess((HadoopPrepareForJobRequest)msg); - } - else if (msg instanceof HadoopShuffleMessage) { - if (log.isTraceEnabled()) - log.trace("Received shuffle message [desc=" + desc + ", msg=" + msg + ']'); - - initFut.listen(new CI1<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> f) { - try { - HadoopShuffleMessage m = (HadoopShuffleMessage)msg; - - shuffleJob.onShuffleMessage(m); - - comm.sendMessage(desc, new HadoopShuffleAck(m.id(), m.jobId())); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to process hadoop shuffle message [desc=" + desc + ", msg=" + msg + ']', e); - } - } - }); - } - else if (msg instanceof HadoopShuffleAck) { - if (log.isTraceEnabled()) - log.trace("Received shuffle ack [desc=" + desc + ", msg=" + msg + ']'); - - shuffleJob.onShuffleAck((HadoopShuffleAck)msg); - } - else - log.warning("Unknown message received (will ignore) [desc=" + desc + ", msg=" + msg + ']'); - } - - /** {@inheritDoc} */ - @Override public void onConnectionLost(HadoopProcessDescriptor desc) { - if (log.isDebugEnabled()) - log.debug("Lost connection with remote process: " + desc); - - if (desc == null) - U.warn(log, "Handshake failed."); - else if (desc.processId().equals(nodeDesc.processId())) { - log.warning("Child process lost connection with parent node (will terminate child process)."); - - shutdown(); - - terminate(); - } - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/facedf50/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/child/HadoopExternalProcessStarter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/child/HadoopExternalProcessStarter.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/child/HadoopExternalProcessStarter.java deleted file mode 100644 index a2345b0..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/child/HadoopExternalProcessStarter.java +++ /dev/null @@ -1,301 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external.child; - -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.io.PrintStream; -import java.net.URL; -import java.util.UUID; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteLogger; -import org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external.HadoopProcessDescriptor; -import org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external.communication.HadoopExternalCommunication; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteClosure; -import org.apache.ignite.logger.log4j.Log4JLogger; -import org.apache.ignite.marshaller.jdk.JdkMarshaller; - -/** - * Hadoop external process base class. - */ -public class HadoopExternalProcessStarter { - /** Path to Log4j configuration file. */ - public static final String DFLT_LOG4J_CONFIG = "config/ignite-log4j.xml"; - - /** Arguments. */ - private Args args; - - /** System out. */ - private OutputStream out; - - /** System err. */ - private OutputStream err; - - /** - * @param args Parsed arguments. - */ - public HadoopExternalProcessStarter(Args args) { - this.args = args; - } - - /** - * @param cmdArgs Process arguments. - */ - public static void main(String[] cmdArgs) { - try { - Args args = arguments(cmdArgs); - - new HadoopExternalProcessStarter(args).run(); - } - catch (Exception e) { - System.err.println("Failed"); - - System.err.println(e.getMessage()); - - e.printStackTrace(System.err); - } - } - - /** - * - * @throws Exception - */ - public void run() throws Exception { - U.setWorkDirectory(args.workDir, U.getIgniteHome()); - - File outputDir = outputDirectory(); - - initializeStreams(outputDir); - - ExecutorService msgExecSvc = Executors.newFixedThreadPool( - Integer.getInteger("MSG_THREAD_POOL_SIZE", Runtime.getRuntime().availableProcessors() * 2)); - - IgniteLogger log = logger(outputDir); - - HadoopExternalCommunication comm = new HadoopExternalCommunication( - args.nodeId, - args.childProcId, - new JdkMarshaller(), - log, - msgExecSvc, - "external" - ); - - comm.start(); - - HadoopProcessDescriptor nodeDesc = new HadoopProcessDescriptor(args.nodeId, args.parentProcId); - nodeDesc.address(args.addr); - nodeDesc.tcpPort(args.tcpPort); - nodeDesc.sharedMemoryPort(args.shmemPort); - - HadoopChildProcessRunner runner = new HadoopChildProcessRunner(); - - runner.start(comm, nodeDesc, msgExecSvc, log); - - System.err.println("Started"); - System.err.flush(); - - System.setOut(new PrintStream(out)); - System.setErr(new PrintStream(err)); - } - - /** - * @param outputDir Directory for process output. - * @throws Exception - */ - private void initializeStreams(File outputDir) throws Exception { - out = new FileOutputStream(new File(outputDir, args.childProcId + ".out")); - err = new FileOutputStream(new File(outputDir, args.childProcId + ".err")); - } - - /** - * @return Path to output directory. - * @throws IOException If failed. - */ - private File outputDirectory() throws IOException { - File f = new File(args.out); - - if (!f.exists()) { - if (!f.mkdirs()) - throw new IOException("Failed to create output directory: " + args.out); - } - else { - if (f.isFile()) - throw new IOException("Output directory is a file: " + args.out); - } - - return f; - } - - /** - * @param outputDir Directory for process output. - * @return Logger. - */ - private IgniteLogger logger(final File outputDir) { - final URL url = U.resolveIgniteUrl(DFLT_LOG4J_CONFIG); - - Log4JLogger logger; - - try { - logger = url != null ? new Log4JLogger(url) : new Log4JLogger(true); - } - catch (IgniteCheckedException e) { - System.err.println("Failed to create URL-based logger. Will use default one."); - - e.printStackTrace(); - - logger = new Log4JLogger(true); - } - - logger.updateFilePath(new IgniteClosure<String, String>() { - @Override public String apply(String s) { - return new File(outputDir, args.childProcId + ".log").getAbsolutePath(); - } - }); - - return logger; - } - - /** - * @param processArgs Process arguments. - * @return Child process instance. - */ - private static Args arguments(String[] processArgs) throws Exception { - Args args = new Args(); - - for (int i = 0; i < processArgs.length; i++) { - String arg = processArgs[i]; - - switch (arg) { - case "-cpid": { - if (i == processArgs.length - 1) - throw new Exception("Missing process ID for '-cpid' parameter"); - - String procIdStr = processArgs[++i]; - - args.childProcId = UUID.fromString(procIdStr); - - break; - } - - case "-ppid": { - if (i == processArgs.length - 1) - throw new Exception("Missing process ID for '-ppid' parameter"); - - String procIdStr = processArgs[++i]; - - args.parentProcId = UUID.fromString(procIdStr); - - break; - } - - case "-nid": { - if (i == processArgs.length - 1) - throw new Exception("Missing node ID for '-nid' parameter"); - - String nodeIdStr = processArgs[++i]; - - args.nodeId = UUID.fromString(nodeIdStr); - - break; - } - - case "-addr": { - if (i == processArgs.length - 1) - throw new Exception("Missing node address for '-addr' parameter"); - - args.addr = processArgs[++i]; - - break; - } - - case "-tport": { - if (i == processArgs.length - 1) - throw new Exception("Missing tcp port for '-tport' parameter"); - - args.tcpPort = Integer.parseInt(processArgs[++i]); - - break; - } - - case "-sport": { - if (i == processArgs.length - 1) - throw new Exception("Missing shared memory port for '-sport' parameter"); - - args.shmemPort = Integer.parseInt(processArgs[++i]); - - break; - } - - case "-out": { - if (i == processArgs.length - 1) - throw new Exception("Missing output folder name for '-out' parameter"); - - args.out = processArgs[++i]; - - break; - } - - case "-wd": { - if (i == processArgs.length - 1) - throw new Exception("Missing work folder name for '-wd' parameter"); - - args.workDir = processArgs[++i]; - - break; - } - } - } - - return args; - } - - /** - * Execution arguments. - */ - private static class Args { - /** Process ID. */ - private UUID childProcId; - - /** Process ID. */ - private UUID parentProcId; - - /** Process ID. */ - private UUID nodeId; - - /** Node address. */ - private String addr; - - /** TCP port */ - private int tcpPort; - - /** Shmem port. */ - private int shmemPort = -1; - - /** Output folder. */ - private String out; - - /** Work directory. */ - private String workDir; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/facedf50/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/communication/HadoopAbstractCommunicationClient.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/communication/HadoopAbstractCommunicationClient.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/communication/HadoopAbstractCommunicationClient.java deleted file mode 100644 index b421267..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/communication/HadoopAbstractCommunicationClient.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external.communication; - -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; - -/** - * Implements basic lifecycle for communication clients. - */ -public abstract class HadoopAbstractCommunicationClient implements HadoopCommunicationClient { - /** Time when this client was last used. */ - private volatile long lastUsed = U.currentTimeMillis(); - - /** Reservations. */ - private final AtomicInteger reserves = new AtomicInteger(); - - /** {@inheritDoc} */ - @Override public boolean close() { - return reserves.compareAndSet(0, -1); - } - - /** {@inheritDoc} */ - @Override public void forceClose() { - reserves.set(-1); - } - - /** {@inheritDoc} */ - @Override public boolean closed() { - return reserves.get() == -1; - } - - /** {@inheritDoc} */ - @Override public boolean reserve() { - while (true) { - int r = reserves.get(); - - if (r == -1) - return false; - - if (reserves.compareAndSet(r, r + 1)) - return true; - } - } - - /** {@inheritDoc} */ - @Override public void release() { - while (true) { - int r = reserves.get(); - - if (r == -1) - return; - - if (reserves.compareAndSet(r, r - 1)) - return; - } - } - - /** {@inheritDoc} */ - @Override public boolean reserved() { - return reserves.get() > 0; - } - - /** {@inheritDoc} */ - @Override public long getIdleTime() { - return U.currentTimeMillis() - lastUsed; - } - - /** - * Updates used time. - */ - protected void markUsed() { - lastUsed = U.currentTimeMillis(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(HadoopAbstractCommunicationClient.class, this); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/facedf50/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/communication/HadoopCommunicationClient.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/communication/HadoopCommunicationClient.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/communication/HadoopCommunicationClient.java deleted file mode 100644 index 623c260..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/communication/HadoopCommunicationClient.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external.communication; - -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage; -import org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external.HadoopProcessDescriptor; - -/** - * - */ -public interface HadoopCommunicationClient { - /** - * @return {@code True} if client has been closed by this call, - * {@code false} if failed to close client (due to concurrent reservation or concurrent close). - */ - public boolean close(); - - /** - * Forces client close. - */ - public void forceClose(); - - /** - * @return {@code True} if client is closed; - */ - public boolean closed(); - - /** - * @return {@code True} if client was reserved, {@code false} otherwise. - */ - public boolean reserve(); - - /** - * Releases this client by decreasing reservations. - */ - public void release(); - - /** - * @return {@code True} if client was reserved. - */ - public boolean reserved(); - - /** - * Gets idle time of this client. - * - * @return Idle time of this client. - */ - public long getIdleTime(); - - /** - * @param desc Process descriptor. - * @param msg Message to send. - * @throws IgniteCheckedException If failed. - */ - public void sendMessage(HadoopProcessDescriptor desc, HadoopMessage msg) throws IgniteCheckedException; -} \ No newline at end of file