http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java
new file mode 100644
index 0000000..f4ee97f
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java
@@ -0,0 +1,552 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.igfs;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteFileSystem;
+import org.apache.ignite.IgniteIllegalStateException;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.igfs.IgfsBlockLocation;
+import org.apache.ignite.igfs.IgfsFile;
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.igfs.IgfsPathSummary;
+import org.apache.ignite.internal.processors.igfs.IgfsEx;
+import org.apache.ignite.internal.processors.igfs.IgfsHandshakeResponse;
+import org.apache.ignite.internal.processors.igfs.IgfsStatus;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.SB;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.IgniteState.STARTED;
+import static 
org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint.LOCALHOST;
+import static 
org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils.PARAM_IGFS_ENDPOINT_NO_EMBED;
+import static 
org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils.PARAM_IGFS_ENDPOINT_NO_LOCAL_SHMEM;
+import static 
org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils.PARAM_IGFS_ENDPOINT_NO_LOCAL_TCP;
+import static 
org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils.parameter;
+
+/**
+ * Wrapper for IGFS server.
+ */
+public class HadoopIgfsWrapper implements HadoopIgfs {
+    /** Delegate. */
+    private final AtomicReference<Delegate> delegateRef = new 
AtomicReference<>();
+
+    /** Authority. */
+    private final String authority;
+
+    /** Connection string. */
+    private final HadoopIgfsEndpoint endpoint;
+
+    /** Log directory. */
+    private final String logDir;
+
+    /** Configuration. */
+    private final Configuration conf;
+
+    /** Logger. */
+    private final Log log;
+
+    /** The user name this wrapper works on behalf of. */
+    private final String userName;
+
+    /**
+     * Constructor.
+     *
+     * @param authority Authority (connection string).
+     * @param logDir Log directory for server.
+     * @param conf Configuration.
+     * @param log Current logger.
+     */
+    public HadoopIgfsWrapper(String authority, String logDir, Configuration 
conf, Log log, String user)
+        throws IOException {
+        try {
+            this.authority = authority;
+            this.endpoint = new HadoopIgfsEndpoint(authority);
+            this.logDir = logDir;
+            this.conf = conf;
+            this.log = log;
+            this.userName = user;
+        }
+        catch (IgniteCheckedException e) {
+            throw new IOException("Failed to parse endpoint: " + authority, e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsHandshakeResponse handshake(String logDir) throws 
IOException {
+        return withReconnectHandling(new 
FileSystemClosure<IgfsHandshakeResponse>() {
+            @Override public IgfsHandshakeResponse apply(HadoopIgfsEx hadoop,
+                IgfsHandshakeResponse hndResp) {
+                return hndResp;
+            }
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close(boolean force) {
+        Delegate delegate = delegateRef.get();
+
+        if (delegate != null && delegateRef.compareAndSet(delegate, null))
+            delegate.close(force);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsFile info(final IgfsPath path) throws IOException {
+        return withReconnectHandling(new FileSystemClosure<IgfsFile>() {
+            @Override public IgfsFile apply(HadoopIgfsEx hadoop, 
IgfsHandshakeResponse hndResp)
+                throws IgniteCheckedException, IOException {
+                return hadoop.info(path);
+            }
+        }, path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsFile update(final IgfsPath path, final Map<String, 
String> props) throws IOException {
+        return withReconnectHandling(new FileSystemClosure<IgfsFile>() {
+            @Override public IgfsFile apply(HadoopIgfsEx hadoop, 
IgfsHandshakeResponse hndResp)
+                throws IgniteCheckedException, IOException {
+                return hadoop.update(path, props);
+            }
+        }, path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Boolean setTimes(final IgfsPath path, final long 
accessTime, final long modificationTime)
+        throws IOException {
+        return withReconnectHandling(new FileSystemClosure<Boolean>() {
+            @Override public Boolean apply(HadoopIgfsEx hadoop, 
IgfsHandshakeResponse hndResp)
+                throws IgniteCheckedException, IOException {
+                return hadoop.setTimes(path, accessTime, modificationTime);
+            }
+        }, path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Boolean rename(final IgfsPath src, final IgfsPath dest) 
throws IOException {
+        return withReconnectHandling(new FileSystemClosure<Boolean>() {
+            @Override public Boolean apply(HadoopIgfsEx hadoop, 
IgfsHandshakeResponse hndResp)
+                throws IgniteCheckedException, IOException {
+                return hadoop.rename(src, dest);
+            }
+        }, src);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Boolean delete(final IgfsPath path, final boolean 
recursive) throws IOException {
+        return withReconnectHandling(new FileSystemClosure<Boolean>() {
+            @Override public Boolean apply(HadoopIgfsEx hadoop, 
IgfsHandshakeResponse hndResp)
+                throws IgniteCheckedException, IOException {
+                return hadoop.delete(path, recursive);
+            }
+        }, path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<IgfsBlockLocation> affinity(final IgfsPath 
path, final long start,
+        final long len) throws IOException {
+        return withReconnectHandling(new 
FileSystemClosure<Collection<IgfsBlockLocation>>() {
+            @Override public Collection<IgfsBlockLocation> apply(HadoopIgfsEx 
hadoop,
+                IgfsHandshakeResponse hndResp) throws IgniteCheckedException, 
IOException {
+                return hadoop.affinity(path, start, len);
+            }
+        }, path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsPathSummary contentSummary(final IgfsPath path) 
throws IOException {
+        return withReconnectHandling(new FileSystemClosure<IgfsPathSummary>() {
+            @Override public IgfsPathSummary apply(HadoopIgfsEx hadoop, 
IgfsHandshakeResponse hndResp)
+                throws IgniteCheckedException, IOException {
+                return hadoop.contentSummary(path);
+            }
+        }, path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Boolean mkdirs(final IgfsPath path, final Map<String, 
String> props) throws IOException {
+        return withReconnectHandling(new FileSystemClosure<Boolean>() {
+            @Override public Boolean apply(HadoopIgfsEx hadoop, 
IgfsHandshakeResponse hndResp)
+                throws IgniteCheckedException, IOException {
+                return hadoop.mkdirs(path, props);
+            }
+        }, path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<IgfsFile> listFiles(final IgfsPath path) 
throws IOException {
+        return withReconnectHandling(new 
FileSystemClosure<Collection<IgfsFile>>() {
+            @Override public Collection<IgfsFile> apply(HadoopIgfsEx hadoop,
+                IgfsHandshakeResponse hndResp) throws IgniteCheckedException, 
IOException {
+                return hadoop.listFiles(path);
+            }
+        }, path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<IgfsPath> listPaths(final IgfsPath path) 
throws IOException {
+        return withReconnectHandling(new 
FileSystemClosure<Collection<IgfsPath>>() {
+            @Override public Collection<IgfsPath> apply(HadoopIgfsEx hadoop,
+                IgfsHandshakeResponse hndResp) throws IgniteCheckedException, 
IOException {
+                return hadoop.listPaths(path);
+            }
+        }, path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsStatus fsStatus() throws IOException {
+        return withReconnectHandling(new FileSystemClosure<IgfsStatus>() {
+            @Override public IgfsStatus apply(HadoopIgfsEx hadoop, 
IgfsHandshakeResponse hndResp)
+                throws IgniteCheckedException, IOException {
+                return hadoop.fsStatus();
+            }
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopIgfsStreamDelegate open(final IgfsPath path) throws 
IOException {
+        return withReconnectHandling(new 
FileSystemClosure<HadoopIgfsStreamDelegate>() {
+            @Override public HadoopIgfsStreamDelegate apply(HadoopIgfsEx 
hadoop,
+                IgfsHandshakeResponse hndResp) throws IgniteCheckedException, 
IOException {
+                return hadoop.open(path);
+            }
+        }, path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopIgfsStreamDelegate open(final IgfsPath path, final 
int seqReadsBeforePrefetch)
+        throws IOException {
+        return withReconnectHandling(new 
FileSystemClosure<HadoopIgfsStreamDelegate>() {
+            @Override public HadoopIgfsStreamDelegate apply(HadoopIgfsEx 
hadoop,
+                IgfsHandshakeResponse hndResp) throws IgniteCheckedException, 
IOException {
+                return hadoop.open(path, seqReadsBeforePrefetch);
+            }
+        }, path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopIgfsStreamDelegate create(final IgfsPath path, 
final boolean overwrite,
+        final boolean colocate, final int replication, final long blockSize, 
@Nullable final Map<String, String> props)
+        throws IOException {
+        return withReconnectHandling(new 
FileSystemClosure<HadoopIgfsStreamDelegate>() {
+            @Override public HadoopIgfsStreamDelegate apply(HadoopIgfsEx 
hadoop,
+                IgfsHandshakeResponse hndResp) throws IgniteCheckedException, 
IOException {
+                return hadoop.create(path, overwrite, colocate, replication, 
blockSize, props);
+            }
+        }, path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopIgfsStreamDelegate append(final IgfsPath path, 
final boolean create,
+        @Nullable final Map<String, String> props) throws IOException {
+        return withReconnectHandling(new 
FileSystemClosure<HadoopIgfsStreamDelegate>() {
+            @Override public HadoopIgfsStreamDelegate apply(HadoopIgfsEx 
hadoop,
+                IgfsHandshakeResponse hndResp) throws IgniteCheckedException, 
IOException {
+                return hadoop.append(path, create, props);
+            }
+        }, path);
+    }
+
+    /**
+     * Execute closure which is not path-specific.
+     *
+     * @param clo Closure.
+     * @return Result.
+     * @throws IOException If failed.
+     */
+    private <T> T withReconnectHandling(FileSystemClosure<T> clo) throws 
IOException {
+        return withReconnectHandling(clo, null);
+    }
+
+    /**
+     * Execute closure.
+     *
+     * @param clo Closure.
+     * @param path Path for exceptions.
+     * @return Result.
+     * @throws IOException If failed.
+     */
+    private <T> T withReconnectHandling(final FileSystemClosure<T> clo, 
@Nullable IgfsPath path)
+        throws IOException {
+        Exception err = null;
+
+        for (int i = 0; i < 2; i++) {
+            Delegate curDelegate = null;
+
+            boolean close = false;
+            boolean force = false;
+
+            try {
+                curDelegate = delegate();
+
+                assert curDelegate != null;
+
+                close = curDelegate.doomed;
+
+                return clo.apply(curDelegate.hadoop, curDelegate.hndResp);
+            }
+            catch (HadoopIgfsCommunicationException e) {
+                if (curDelegate != null && !curDelegate.doomed) {
+                    // Try getting rid fo faulty delegate ASAP.
+                    delegateRef.compareAndSet(curDelegate, null);
+
+                    close = true;
+                    force = true;
+                }
+
+                if (log.isDebugEnabled())
+                    log.debug("Failed to send message to a server: " + e);
+
+                err = e;
+            }
+            catch (IgniteCheckedException e) {
+                throw HadoopIgfsUtils.cast(e, path != null ? path.toString() : 
null);
+            }
+            finally {
+                if (close) {
+                    assert curDelegate != null;
+
+                    curDelegate.close(force);
+                }
+            }
+        }
+
+        List<Throwable> list = X.getThrowableList(err);
+
+        Throwable cause = list.get(list.size() - 1);
+
+        throw new IOException("Failed to communicate with IGFS: "
+            + (cause.getMessage() == null ? cause.toString() : 
cause.getMessage()), err);
+    }
+
+    /**
+     * Get delegate creating it if needed.
+     *
+     * @return Delegate.
+     */
+    private Delegate delegate() throws HadoopIgfsCommunicationException {
+        // These fields will contain possible exceptions from shmem and TCP 
endpoints.
+        Exception errShmem = null;
+        Exception errTcp = null;
+
+        // 1. If delegate is set, return it immediately.
+        Delegate curDelegate = delegateRef.get();
+
+        if (curDelegate != null)
+            return curDelegate;
+
+        // 2. Guess that we are in the same VM.
+        boolean skipInProc = parameter(conf, PARAM_IGFS_ENDPOINT_NO_EMBED, 
authority, false);
+
+        if (!skipInProc) {
+            IgfsEx igfs = getIgfsEx(endpoint.grid(), endpoint.igfs());
+
+            if (igfs != null) {
+                HadoopIgfsEx hadoop = null;
+
+                try {
+                    hadoop = new HadoopIgfsInProc(igfs, log, userName);
+
+                    curDelegate = new Delegate(hadoop, 
hadoop.handshake(logDir));
+                }
+                catch (IOException | IgniteCheckedException e) {
+                    if (e instanceof HadoopIgfsCommunicationException)
+                        if (hadoop != null)
+                            hadoop.close(true);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Failed to connect to in-process IGFS, 
fallback to IPC mode.", e);
+                }
+            }
+        }
+
+        // 3. Try connecting using shmem.
+        boolean skipLocShmem = parameter(conf, 
PARAM_IGFS_ENDPOINT_NO_LOCAL_SHMEM, authority, false);
+
+        if (curDelegate == null && !skipLocShmem && !U.isWindows()) {
+            HadoopIgfsEx hadoop = null;
+
+            try {
+                hadoop = new HadoopIgfsOutProc(endpoint.port(), 
endpoint.grid(), endpoint.igfs(), log, userName);
+
+                curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
+            }
+            catch (IOException | IgniteCheckedException e) {
+                if (e instanceof HadoopIgfsCommunicationException)
+                    hadoop.close(true);
+
+                if (log.isDebugEnabled())
+                    log.debug("Failed to connect to IGFS using shared memory 
[port=" + endpoint.port() + ']', e);
+
+                errShmem = e;
+            }
+        }
+
+        // 4. Try local TCP connection.
+        boolean skipLocTcp = parameter(conf, PARAM_IGFS_ENDPOINT_NO_LOCAL_TCP, 
authority, false);
+
+        if (curDelegate == null && !skipLocTcp) {
+            HadoopIgfsEx hadoop = null;
+
+            try {
+                hadoop = new HadoopIgfsOutProc(LOCALHOST, endpoint.port(), 
endpoint.grid(), endpoint.igfs(),
+                    log, userName);
+
+                curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
+            }
+            catch (IOException | IgniteCheckedException e) {
+                if (e instanceof HadoopIgfsCommunicationException)
+                    hadoop.close(true);
+
+                if (log.isDebugEnabled())
+                    log.debug("Failed to connect to IGFS using TCP [host=" + 
endpoint.host() +
+                        ", port=" + endpoint.port() + ']', e);
+
+                errTcp = e;
+            }
+        }
+
+        // 5. Try remote TCP connection.
+        if (curDelegate == null && (skipLocTcp || !F.eq(LOCALHOST, 
endpoint.host()))) {
+            HadoopIgfsEx hadoop = null;
+
+            try {
+                hadoop = new HadoopIgfsOutProc(endpoint.host(), 
endpoint.port(), endpoint.grid(), endpoint.igfs(),
+                    log, userName);
+
+                curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
+            }
+            catch (IOException | IgniteCheckedException e) {
+                if (e instanceof HadoopIgfsCommunicationException)
+                    hadoop.close(true);
+
+                if (log.isDebugEnabled())
+                    log.debug("Failed to connect to IGFS using TCP [host=" + 
endpoint.host() +
+                        ", port=" + endpoint.port() + ']', e);
+
+                errTcp = e;
+            }
+        }
+
+        if (curDelegate != null) {
+            if (!delegateRef.compareAndSet(null, curDelegate))
+                curDelegate.doomed = true;
+
+            return curDelegate;
+        }
+        else {
+            SB errMsg = new SB("Failed to connect to IGFS [endpoint=igfs://" + 
authority + ", attempts=[");
+
+            if (errShmem != null)
+                errMsg.a("[type=SHMEM, port=" + endpoint.port() + ", err=" + 
errShmem + "], ");
+
+            errMsg.a("[type=TCP, host=" + endpoint.host() + ", port=" + 
endpoint.port() + ", err=" + errTcp + "]] ");
+
+            errMsg.a("(ensure that IGFS is running and have IPC endpoint 
enabled; ensure that " +
+                "ignite-shmem-1.0.0.jar is in Hadoop classpath if you use 
shared memory endpoint).");
+
+            throw new HadoopIgfsCommunicationException(errMsg.toString());
+        }
+    }
+
+    /**
+     * File system operation closure.
+     */
+    private static interface FileSystemClosure<T> {
+        /**
+         * Call closure body.
+         *
+         * @param hadoop RPC handler.
+         * @param hndResp Handshake response.
+         * @return Result.
+         * @throws IgniteCheckedException If failed.
+         * @throws IOException If failed.
+         */
+        public T apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) 
throws IgniteCheckedException, IOException;
+    }
+
+    /**
+     * Delegate.
+     */
+    private static class Delegate {
+        /** RPC handler. */
+        private final HadoopIgfsEx hadoop;
+
+        /** Handshake request. */
+        private final IgfsHandshakeResponse hndResp;
+
+        /** Close guard. */
+        private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+        /** Whether this delegate must be closed at the end of the next 
invocation. */
+        private boolean doomed;
+
+        /**
+         * Constructor.
+         *
+         * @param hadoop Hadoop.
+         * @param hndResp Handshake response.
+         */
+        private Delegate(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) {
+            this.hadoop = hadoop;
+            this.hndResp = hndResp;
+        }
+
+        /**
+         * Close underlying RPC handler.
+         *
+         * @param force Force flag.
+         */
+        private void close(boolean force) {
+            if (closeGuard.compareAndSet(false, true))
+                hadoop.close(force);
+        }
+    }
+
+    /**
+     * Helper method to find Igfs of the given name in the given Ignite 
instance.
+     *
+     * @param gridName The name of the grid to check.
+     * @param igfsName The name of Igfs.
+     * @return The file system instance, or null if not found.
+     */
+    private static IgfsEx getIgfsEx(@Nullable String gridName, @Nullable 
String igfsName) {
+        if (Ignition.state(gridName) == STARTED) {
+            try {
+                for (IgniteFileSystem fs : 
Ignition.ignite(gridName).fileSystems()) {
+                    if (F.eq(fs.name(), igfsName))
+                        return (IgfsEx)fs;
+                }
+            }
+            catch (IgniteIllegalStateException ignore) {
+                // May happen if the grid state has changed:
+            }
+        }
+
+        return null;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java
new file mode 100644
index 0000000..090b336
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.jobtracker;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Collection;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobPhase;
+import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopCountersImpl;
+import 
org.apache.ignite.internal.processors.hadoop.taskexecutor.external.HadoopProcessDescriptor;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static 
org.apache.ignite.internal.processors.hadoop.HadoopJobPhase.PHASE_SETUP;
+
+/**
+ * Hadoop job metadata. Internal object used for distributed job state 
tracking.
+ */
+public class HadoopJobMetadata implements Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Job ID. */
+    private HadoopJobId jobId;
+
+    /** Job info. */
+    private HadoopJobInfo jobInfo;
+
+    /** Node submitted job. */
+    private UUID submitNodeId;
+
+    /** Map-reduce plan. */
+    private HadoopMapReducePlan mrPlan;
+
+    /** Pending splits for which mapper should be executed. */
+    private Map<HadoopInputSplit, Integer> pendingSplits;
+
+    /** Pending reducers. */
+    private Collection<Integer> pendingReducers;
+
+    /** Reducers addresses. */
+    @GridToStringInclude
+    private Map<Integer, HadoopProcessDescriptor> reducersAddrs;
+
+    /** Job phase. */
+    private HadoopJobPhase phase = PHASE_SETUP;
+
+    /** Fail cause. */
+    @GridToStringExclude
+    private Throwable failCause;
+
+    /** Version. */
+    private long ver;
+
+    /** Job counters */
+    private HadoopCounters counters = new HadoopCountersImpl();
+
+    /**
+     * Empty constructor required by {@link Externalizable}.
+     */
+    public HadoopJobMetadata() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param submitNodeId Submit node ID.
+     * @param jobId Job ID.
+     * @param jobInfo Job info.
+     */
+    public HadoopJobMetadata(UUID submitNodeId, HadoopJobId jobId, 
HadoopJobInfo jobInfo) {
+        this.jobId = jobId;
+        this.jobInfo = jobInfo;
+        this.submitNodeId = submitNodeId;
+    }
+
+    /**
+     * Copy constructor.
+     *
+     * @param src Metadata to copy.
+     */
+    public HadoopJobMetadata(HadoopJobMetadata src) {
+        // Make sure to preserve alphabetic order.
+        counters = src.counters;
+        failCause = src.failCause;
+        jobId = src.jobId;
+        jobInfo = src.jobInfo;
+        mrPlan = src.mrPlan;
+        pendingSplits = src.pendingSplits;
+        pendingReducers = src.pendingReducers;
+        phase = src.phase;
+        reducersAddrs = src.reducersAddrs;
+        submitNodeId = src.submitNodeId;
+        ver = src.ver + 1;
+    }
+
+    /**
+     * @return Submit node ID.
+     */
+    public UUID submitNodeId() {
+        return submitNodeId;
+    }
+
+    /**
+     * @param phase Job phase.
+     */
+    public void phase(HadoopJobPhase phase) {
+        this.phase = phase;
+    }
+
+    /**
+     * @return Job phase.
+     */
+    public HadoopJobPhase phase() {
+        return phase;
+    }
+
+    /**
+     * Gets reducers addresses for external execution.
+     *
+     * @return Reducers addresses.
+     */
+    public Map<Integer, HadoopProcessDescriptor> reducersAddresses() {
+        return reducersAddrs;
+    }
+
+    /**
+     * Sets reducers addresses for external execution.
+     *
+     * @param reducersAddrs Map of addresses.
+     */
+    public void reducersAddresses(Map<Integer, HadoopProcessDescriptor> 
reducersAddrs) {
+        this.reducersAddrs = reducersAddrs;
+    }
+
+    /**
+     * Sets collection of pending splits.
+     *
+     * @param pendingSplits Collection of pending splits.
+     */
+    public void pendingSplits(Map<HadoopInputSplit, Integer> pendingSplits) {
+        this.pendingSplits = pendingSplits;
+    }
+
+    /**
+     * Gets collection of pending splits.
+     *
+     * @return Collection of pending splits.
+     */
+    public Map<HadoopInputSplit, Integer> pendingSplits() {
+        return pendingSplits;
+    }
+
+    /**
+     * Sets collection of pending reducers.
+     *
+     * @param pendingReducers Collection of pending reducers.
+     */
+    public void pendingReducers(Collection<Integer> pendingReducers) {
+        this.pendingReducers = pendingReducers;
+    }
+
+    /**
+     * Gets collection of pending reducers.
+     *
+     * @return Collection of pending reducers.
+     */
+    public Collection<Integer> pendingReducers() {
+        return pendingReducers;
+    }
+
+    /**
+     * @return Job ID.
+     */
+    public HadoopJobId jobId() {
+        return jobId;
+    }
+
+    /**
+     * @param mrPlan Map-reduce plan.
+     */
+    public void mapReducePlan(HadoopMapReducePlan mrPlan) {
+        assert this.mrPlan == null : "Map-reduce plan can only be initialized 
once.";
+
+        this.mrPlan = mrPlan;
+    }
+
+    /**
+     * @return Map-reduce plan.
+     */
+    public HadoopMapReducePlan mapReducePlan() {
+        return mrPlan;
+    }
+
+    /**
+     * @return Job info.
+     */
+    public HadoopJobInfo jobInfo() {
+        return jobInfo;
+    }
+
+    /**
+     * Returns job counters.
+     *
+     * @return Collection of counters.
+     */
+    public HadoopCounters counters() {
+        return counters;
+    }
+
+    /**
+     * Sets counters.
+     *
+     * @param counters Collection of counters.
+     */
+    public void counters(HadoopCounters counters) {
+        this.counters = counters;
+    }
+
+    /**
+     * @param failCause Fail cause.
+     */
+    public void failCause(Throwable failCause) {
+        assert failCause != null;
+
+        if (this.failCause == null) // Keep the first error.
+            this.failCause = failCause;
+    }
+
+    /**
+     * @return Fail cause.
+     */
+    public Throwable failCause() {
+        return failCause;
+    }
+
+    /**
+     * @return Version.
+     */
+    public long version() {
+        return ver;
+    }
+
+    /**
+     * @param split Split.
+     * @return Task number.
+     */
+    public int taskNumber(HadoopInputSplit split) {
+        return pendingSplits.get(split);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        U.writeUuid(out, submitNodeId);
+        out.writeObject(jobId);
+        out.writeObject(jobInfo);
+        out.writeObject(mrPlan);
+        out.writeObject(pendingSplits);
+        out.writeObject(pendingReducers);
+        out.writeObject(phase);
+        out.writeObject(failCause);
+        out.writeLong(ver);
+        out.writeObject(reducersAddrs);
+        out.writeObject(counters);
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+        submitNodeId = U.readUuid(in);
+        jobId = (HadoopJobId)in.readObject();
+        jobInfo = (HadoopJobInfo)in.readObject();
+        mrPlan = (HadoopMapReducePlan)in.readObject();
+        pendingSplits = (Map<HadoopInputSplit,Integer>)in.readObject();
+        pendingReducers = (Collection<Integer>)in.readObject();
+        phase = (HadoopJobPhase)in.readObject();
+        failCause = (Throwable)in.readObject();
+        ver = in.readLong();
+        reducersAddrs = (Map<Integer, HadoopProcessDescriptor>)in.readObject();
+        counters = (HadoopCounters)in.readObject();
+    }
+
+    /** {@inheritDoc} */
+    public String toString() {
+        return S.toString(HadoopJobMetadata.class, this, "pendingMaps", 
pendingSplits.size(),
+            "pendingReduces", pendingReducers.size(), "failCause", failCause 
== null ? null :
+                failCause.getClass().getName());
+    }
+}
\ No newline at end of file

Reply via email to