http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java new file mode 100644 index 0000000..f4ee97f --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java @@ -0,0 +1,552 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.igfs; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.logging.Log; +import org.apache.hadoop.conf.Configuration; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteFileSystem; +import org.apache.ignite.IgniteIllegalStateException; +import org.apache.ignite.Ignition; +import org.apache.ignite.igfs.IgfsBlockLocation; +import org.apache.ignite.igfs.IgfsFile; +import org.apache.ignite.igfs.IgfsPath; +import org.apache.ignite.igfs.IgfsPathSummary; +import org.apache.ignite.internal.processors.igfs.IgfsEx; +import org.apache.ignite.internal.processors.igfs.IgfsHandshakeResponse; +import org.apache.ignite.internal.processors.igfs.IgfsStatus; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.SB; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.IgniteState.STARTED; +import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint.LOCALHOST; +import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils.PARAM_IGFS_ENDPOINT_NO_EMBED; +import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils.PARAM_IGFS_ENDPOINT_NO_LOCAL_SHMEM; +import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils.PARAM_IGFS_ENDPOINT_NO_LOCAL_TCP; +import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils.parameter; + +/** + * Wrapper for IGFS server. + */ +public class HadoopIgfsWrapper implements HadoopIgfs { + /** Delegate. */ + private final AtomicReference<Delegate> delegateRef = new AtomicReference<>(); + + /** Authority. */ + private final String authority; + + /** Connection string. */ + private final HadoopIgfsEndpoint endpoint; + + /** Log directory. */ + private final String logDir; + + /** Configuration. */ + private final Configuration conf; + + /** Logger. */ + private final Log log; + + /** The user name this wrapper works on behalf of. */ + private final String userName; + + /** + * Constructor. + * + * @param authority Authority (connection string). + * @param logDir Log directory for server. + * @param conf Configuration. + * @param log Current logger. + */ + public HadoopIgfsWrapper(String authority, String logDir, Configuration conf, Log log, String user) + throws IOException { + try { + this.authority = authority; + this.endpoint = new HadoopIgfsEndpoint(authority); + this.logDir = logDir; + this.conf = conf; + this.log = log; + this.userName = user; + } + catch (IgniteCheckedException e) { + throw new IOException("Failed to parse endpoint: " + authority, e); + } + } + + /** {@inheritDoc} */ + @Override public IgfsHandshakeResponse handshake(String logDir) throws IOException { + return withReconnectHandling(new FileSystemClosure<IgfsHandshakeResponse>() { + @Override public IgfsHandshakeResponse apply(HadoopIgfsEx hadoop, + IgfsHandshakeResponse hndResp) { + return hndResp; + } + }); + } + + /** {@inheritDoc} */ + @Override public void close(boolean force) { + Delegate delegate = delegateRef.get(); + + if (delegate != null && delegateRef.compareAndSet(delegate, null)) + delegate.close(force); + } + + /** {@inheritDoc} */ + @Override public IgfsFile info(final IgfsPath path) throws IOException { + return withReconnectHandling(new FileSystemClosure<IgfsFile>() { + @Override public IgfsFile apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) + throws IgniteCheckedException, IOException { + return hadoop.info(path); + } + }, path); + } + + /** {@inheritDoc} */ + @Override public IgfsFile update(final IgfsPath path, final Map<String, String> props) throws IOException { + return withReconnectHandling(new FileSystemClosure<IgfsFile>() { + @Override public IgfsFile apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) + throws IgniteCheckedException, IOException { + return hadoop.update(path, props); + } + }, path); + } + + /** {@inheritDoc} */ + @Override public Boolean setTimes(final IgfsPath path, final long accessTime, final long modificationTime) + throws IOException { + return withReconnectHandling(new FileSystemClosure<Boolean>() { + @Override public Boolean apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) + throws IgniteCheckedException, IOException { + return hadoop.setTimes(path, accessTime, modificationTime); + } + }, path); + } + + /** {@inheritDoc} */ + @Override public Boolean rename(final IgfsPath src, final IgfsPath dest) throws IOException { + return withReconnectHandling(new FileSystemClosure<Boolean>() { + @Override public Boolean apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) + throws IgniteCheckedException, IOException { + return hadoop.rename(src, dest); + } + }, src); + } + + /** {@inheritDoc} */ + @Override public Boolean delete(final IgfsPath path, final boolean recursive) throws IOException { + return withReconnectHandling(new FileSystemClosure<Boolean>() { + @Override public Boolean apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) + throws IgniteCheckedException, IOException { + return hadoop.delete(path, recursive); + } + }, path); + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsBlockLocation> affinity(final IgfsPath path, final long start, + final long len) throws IOException { + return withReconnectHandling(new FileSystemClosure<Collection<IgfsBlockLocation>>() { + @Override public Collection<IgfsBlockLocation> apply(HadoopIgfsEx hadoop, + IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { + return hadoop.affinity(path, start, len); + } + }, path); + } + + /** {@inheritDoc} */ + @Override public IgfsPathSummary contentSummary(final IgfsPath path) throws IOException { + return withReconnectHandling(new FileSystemClosure<IgfsPathSummary>() { + @Override public IgfsPathSummary apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) + throws IgniteCheckedException, IOException { + return hadoop.contentSummary(path); + } + }, path); + } + + /** {@inheritDoc} */ + @Override public Boolean mkdirs(final IgfsPath path, final Map<String, String> props) throws IOException { + return withReconnectHandling(new FileSystemClosure<Boolean>() { + @Override public Boolean apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) + throws IgniteCheckedException, IOException { + return hadoop.mkdirs(path, props); + } + }, path); + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsFile> listFiles(final IgfsPath path) throws IOException { + return withReconnectHandling(new FileSystemClosure<Collection<IgfsFile>>() { + @Override public Collection<IgfsFile> apply(HadoopIgfsEx hadoop, + IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { + return hadoop.listFiles(path); + } + }, path); + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsPath> listPaths(final IgfsPath path) throws IOException { + return withReconnectHandling(new FileSystemClosure<Collection<IgfsPath>>() { + @Override public Collection<IgfsPath> apply(HadoopIgfsEx hadoop, + IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { + return hadoop.listPaths(path); + } + }, path); + } + + /** {@inheritDoc} */ + @Override public IgfsStatus fsStatus() throws IOException { + return withReconnectHandling(new FileSystemClosure<IgfsStatus>() { + @Override public IgfsStatus apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) + throws IgniteCheckedException, IOException { + return hadoop.fsStatus(); + } + }); + } + + /** {@inheritDoc} */ + @Override public HadoopIgfsStreamDelegate open(final IgfsPath path) throws IOException { + return withReconnectHandling(new FileSystemClosure<HadoopIgfsStreamDelegate>() { + @Override public HadoopIgfsStreamDelegate apply(HadoopIgfsEx hadoop, + IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { + return hadoop.open(path); + } + }, path); + } + + /** {@inheritDoc} */ + @Override public HadoopIgfsStreamDelegate open(final IgfsPath path, final int seqReadsBeforePrefetch) + throws IOException { + return withReconnectHandling(new FileSystemClosure<HadoopIgfsStreamDelegate>() { + @Override public HadoopIgfsStreamDelegate apply(HadoopIgfsEx hadoop, + IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { + return hadoop.open(path, seqReadsBeforePrefetch); + } + }, path); + } + + /** {@inheritDoc} */ + @Override public HadoopIgfsStreamDelegate create(final IgfsPath path, final boolean overwrite, + final boolean colocate, final int replication, final long blockSize, @Nullable final Map<String, String> props) + throws IOException { + return withReconnectHandling(new FileSystemClosure<HadoopIgfsStreamDelegate>() { + @Override public HadoopIgfsStreamDelegate apply(HadoopIgfsEx hadoop, + IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { + return hadoop.create(path, overwrite, colocate, replication, blockSize, props); + } + }, path); + } + + /** {@inheritDoc} */ + @Override public HadoopIgfsStreamDelegate append(final IgfsPath path, final boolean create, + @Nullable final Map<String, String> props) throws IOException { + return withReconnectHandling(new FileSystemClosure<HadoopIgfsStreamDelegate>() { + @Override public HadoopIgfsStreamDelegate apply(HadoopIgfsEx hadoop, + IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { + return hadoop.append(path, create, props); + } + }, path); + } + + /** + * Execute closure which is not path-specific. + * + * @param clo Closure. + * @return Result. + * @throws IOException If failed. + */ + private <T> T withReconnectHandling(FileSystemClosure<T> clo) throws IOException { + return withReconnectHandling(clo, null); + } + + /** + * Execute closure. + * + * @param clo Closure. + * @param path Path for exceptions. + * @return Result. + * @throws IOException If failed. + */ + private <T> T withReconnectHandling(final FileSystemClosure<T> clo, @Nullable IgfsPath path) + throws IOException { + Exception err = null; + + for (int i = 0; i < 2; i++) { + Delegate curDelegate = null; + + boolean close = false; + boolean force = false; + + try { + curDelegate = delegate(); + + assert curDelegate != null; + + close = curDelegate.doomed; + + return clo.apply(curDelegate.hadoop, curDelegate.hndResp); + } + catch (HadoopIgfsCommunicationException e) { + if (curDelegate != null && !curDelegate.doomed) { + // Try getting rid fo faulty delegate ASAP. + delegateRef.compareAndSet(curDelegate, null); + + close = true; + force = true; + } + + if (log.isDebugEnabled()) + log.debug("Failed to send message to a server: " + e); + + err = e; + } + catch (IgniteCheckedException e) { + throw HadoopIgfsUtils.cast(e, path != null ? path.toString() : null); + } + finally { + if (close) { + assert curDelegate != null; + + curDelegate.close(force); + } + } + } + + List<Throwable> list = X.getThrowableList(err); + + Throwable cause = list.get(list.size() - 1); + + throw new IOException("Failed to communicate with IGFS: " + + (cause.getMessage() == null ? cause.toString() : cause.getMessage()), err); + } + + /** + * Get delegate creating it if needed. + * + * @return Delegate. + */ + private Delegate delegate() throws HadoopIgfsCommunicationException { + // These fields will contain possible exceptions from shmem and TCP endpoints. + Exception errShmem = null; + Exception errTcp = null; + + // 1. If delegate is set, return it immediately. + Delegate curDelegate = delegateRef.get(); + + if (curDelegate != null) + return curDelegate; + + // 2. Guess that we are in the same VM. + boolean skipInProc = parameter(conf, PARAM_IGFS_ENDPOINT_NO_EMBED, authority, false); + + if (!skipInProc) { + IgfsEx igfs = getIgfsEx(endpoint.grid(), endpoint.igfs()); + + if (igfs != null) { + HadoopIgfsEx hadoop = null; + + try { + hadoop = new HadoopIgfsInProc(igfs, log, userName); + + curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); + } + catch (IOException | IgniteCheckedException e) { + if (e instanceof HadoopIgfsCommunicationException) + if (hadoop != null) + hadoop.close(true); + + if (log.isDebugEnabled()) + log.debug("Failed to connect to in-process IGFS, fallback to IPC mode.", e); + } + } + } + + // 3. Try connecting using shmem. + boolean skipLocShmem = parameter(conf, PARAM_IGFS_ENDPOINT_NO_LOCAL_SHMEM, authority, false); + + if (curDelegate == null && !skipLocShmem && !U.isWindows()) { + HadoopIgfsEx hadoop = null; + + try { + hadoop = new HadoopIgfsOutProc(endpoint.port(), endpoint.grid(), endpoint.igfs(), log, userName); + + curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); + } + catch (IOException | IgniteCheckedException e) { + if (e instanceof HadoopIgfsCommunicationException) + hadoop.close(true); + + if (log.isDebugEnabled()) + log.debug("Failed to connect to IGFS using shared memory [port=" + endpoint.port() + ']', e); + + errShmem = e; + } + } + + // 4. Try local TCP connection. + boolean skipLocTcp = parameter(conf, PARAM_IGFS_ENDPOINT_NO_LOCAL_TCP, authority, false); + + if (curDelegate == null && !skipLocTcp) { + HadoopIgfsEx hadoop = null; + + try { + hadoop = new HadoopIgfsOutProc(LOCALHOST, endpoint.port(), endpoint.grid(), endpoint.igfs(), + log, userName); + + curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); + } + catch (IOException | IgniteCheckedException e) { + if (e instanceof HadoopIgfsCommunicationException) + hadoop.close(true); + + if (log.isDebugEnabled()) + log.debug("Failed to connect to IGFS using TCP [host=" + endpoint.host() + + ", port=" + endpoint.port() + ']', e); + + errTcp = e; + } + } + + // 5. Try remote TCP connection. + if (curDelegate == null && (skipLocTcp || !F.eq(LOCALHOST, endpoint.host()))) { + HadoopIgfsEx hadoop = null; + + try { + hadoop = new HadoopIgfsOutProc(endpoint.host(), endpoint.port(), endpoint.grid(), endpoint.igfs(), + log, userName); + + curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); + } + catch (IOException | IgniteCheckedException e) { + if (e instanceof HadoopIgfsCommunicationException) + hadoop.close(true); + + if (log.isDebugEnabled()) + log.debug("Failed to connect to IGFS using TCP [host=" + endpoint.host() + + ", port=" + endpoint.port() + ']', e); + + errTcp = e; + } + } + + if (curDelegate != null) { + if (!delegateRef.compareAndSet(null, curDelegate)) + curDelegate.doomed = true; + + return curDelegate; + } + else { + SB errMsg = new SB("Failed to connect to IGFS [endpoint=igfs://" + authority + ", attempts=["); + + if (errShmem != null) + errMsg.a("[type=SHMEM, port=" + endpoint.port() + ", err=" + errShmem + "], "); + + errMsg.a("[type=TCP, host=" + endpoint.host() + ", port=" + endpoint.port() + ", err=" + errTcp + "]] "); + + errMsg.a("(ensure that IGFS is running and have IPC endpoint enabled; ensure that " + + "ignite-shmem-1.0.0.jar is in Hadoop classpath if you use shared memory endpoint)."); + + throw new HadoopIgfsCommunicationException(errMsg.toString()); + } + } + + /** + * File system operation closure. + */ + private static interface FileSystemClosure<T> { + /** + * Call closure body. + * + * @param hadoop RPC handler. + * @param hndResp Handshake response. + * @return Result. + * @throws IgniteCheckedException If failed. + * @throws IOException If failed. + */ + public T apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException; + } + + /** + * Delegate. + */ + private static class Delegate { + /** RPC handler. */ + private final HadoopIgfsEx hadoop; + + /** Handshake request. */ + private final IgfsHandshakeResponse hndResp; + + /** Close guard. */ + private final AtomicBoolean closeGuard = new AtomicBoolean(); + + /** Whether this delegate must be closed at the end of the next invocation. */ + private boolean doomed; + + /** + * Constructor. + * + * @param hadoop Hadoop. + * @param hndResp Handshake response. + */ + private Delegate(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) { + this.hadoop = hadoop; + this.hndResp = hndResp; + } + + /** + * Close underlying RPC handler. + * + * @param force Force flag. + */ + private void close(boolean force) { + if (closeGuard.compareAndSet(false, true)) + hadoop.close(force); + } + } + + /** + * Helper method to find Igfs of the given name in the given Ignite instance. + * + * @param gridName The name of the grid to check. + * @param igfsName The name of Igfs. + * @return The file system instance, or null if not found. + */ + private static IgfsEx getIgfsEx(@Nullable String gridName, @Nullable String igfsName) { + if (Ignition.state(gridName) == STARTED) { + try { + for (IgniteFileSystem fs : Ignition.ignite(gridName).fileSystems()) { + if (F.eq(fs.name(), igfsName)) + return (IgfsEx)fs; + } + } + catch (IgniteIllegalStateException ignore) { + // May happen if the grid state has changed: + } + } + + return null; + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java new file mode 100644 index 0000000..090b336 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.jobtracker; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.Collection; +import java.util.Map; +import java.util.UUID; +import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit; +import org.apache.ignite.internal.processors.hadoop.HadoopJobId; +import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo; +import org.apache.ignite.internal.processors.hadoop.HadoopJobPhase; +import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan; +import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; +import org.apache.ignite.internal.processors.hadoop.counter.HadoopCountersImpl; +import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.HadoopProcessDescriptor; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; + +import static org.apache.ignite.internal.processors.hadoop.HadoopJobPhase.PHASE_SETUP; + +/** + * Hadoop job metadata. Internal object used for distributed job state tracking. + */ +public class HadoopJobMetadata implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Job ID. */ + private HadoopJobId jobId; + + /** Job info. */ + private HadoopJobInfo jobInfo; + + /** Node submitted job. */ + private UUID submitNodeId; + + /** Map-reduce plan. */ + private HadoopMapReducePlan mrPlan; + + /** Pending splits for which mapper should be executed. */ + private Map<HadoopInputSplit, Integer> pendingSplits; + + /** Pending reducers. */ + private Collection<Integer> pendingReducers; + + /** Reducers addresses. */ + @GridToStringInclude + private Map<Integer, HadoopProcessDescriptor> reducersAddrs; + + /** Job phase. */ + private HadoopJobPhase phase = PHASE_SETUP; + + /** Fail cause. */ + @GridToStringExclude + private Throwable failCause; + + /** Version. */ + private long ver; + + /** Job counters */ + private HadoopCounters counters = new HadoopCountersImpl(); + + /** + * Empty constructor required by {@link Externalizable}. + */ + public HadoopJobMetadata() { + // No-op. + } + + /** + * Constructor. + * + * @param submitNodeId Submit node ID. + * @param jobId Job ID. + * @param jobInfo Job info. + */ + public HadoopJobMetadata(UUID submitNodeId, HadoopJobId jobId, HadoopJobInfo jobInfo) { + this.jobId = jobId; + this.jobInfo = jobInfo; + this.submitNodeId = submitNodeId; + } + + /** + * Copy constructor. + * + * @param src Metadata to copy. + */ + public HadoopJobMetadata(HadoopJobMetadata src) { + // Make sure to preserve alphabetic order. + counters = src.counters; + failCause = src.failCause; + jobId = src.jobId; + jobInfo = src.jobInfo; + mrPlan = src.mrPlan; + pendingSplits = src.pendingSplits; + pendingReducers = src.pendingReducers; + phase = src.phase; + reducersAddrs = src.reducersAddrs; + submitNodeId = src.submitNodeId; + ver = src.ver + 1; + } + + /** + * @return Submit node ID. + */ + public UUID submitNodeId() { + return submitNodeId; + } + + /** + * @param phase Job phase. + */ + public void phase(HadoopJobPhase phase) { + this.phase = phase; + } + + /** + * @return Job phase. + */ + public HadoopJobPhase phase() { + return phase; + } + + /** + * Gets reducers addresses for external execution. + * + * @return Reducers addresses. + */ + public Map<Integer, HadoopProcessDescriptor> reducersAddresses() { + return reducersAddrs; + } + + /** + * Sets reducers addresses for external execution. + * + * @param reducersAddrs Map of addresses. + */ + public void reducersAddresses(Map<Integer, HadoopProcessDescriptor> reducersAddrs) { + this.reducersAddrs = reducersAddrs; + } + + /** + * Sets collection of pending splits. + * + * @param pendingSplits Collection of pending splits. + */ + public void pendingSplits(Map<HadoopInputSplit, Integer> pendingSplits) { + this.pendingSplits = pendingSplits; + } + + /** + * Gets collection of pending splits. + * + * @return Collection of pending splits. + */ + public Map<HadoopInputSplit, Integer> pendingSplits() { + return pendingSplits; + } + + /** + * Sets collection of pending reducers. + * + * @param pendingReducers Collection of pending reducers. + */ + public void pendingReducers(Collection<Integer> pendingReducers) { + this.pendingReducers = pendingReducers; + } + + /** + * Gets collection of pending reducers. + * + * @return Collection of pending reducers. + */ + public Collection<Integer> pendingReducers() { + return pendingReducers; + } + + /** + * @return Job ID. + */ + public HadoopJobId jobId() { + return jobId; + } + + /** + * @param mrPlan Map-reduce plan. + */ + public void mapReducePlan(HadoopMapReducePlan mrPlan) { + assert this.mrPlan == null : "Map-reduce plan can only be initialized once."; + + this.mrPlan = mrPlan; + } + + /** + * @return Map-reduce plan. + */ + public HadoopMapReducePlan mapReducePlan() { + return mrPlan; + } + + /** + * @return Job info. + */ + public HadoopJobInfo jobInfo() { + return jobInfo; + } + + /** + * Returns job counters. + * + * @return Collection of counters. + */ + public HadoopCounters counters() { + return counters; + } + + /** + * Sets counters. + * + * @param counters Collection of counters. + */ + public void counters(HadoopCounters counters) { + this.counters = counters; + } + + /** + * @param failCause Fail cause. + */ + public void failCause(Throwable failCause) { + assert failCause != null; + + if (this.failCause == null) // Keep the first error. + this.failCause = failCause; + } + + /** + * @return Fail cause. + */ + public Throwable failCause() { + return failCause; + } + + /** + * @return Version. + */ + public long version() { + return ver; + } + + /** + * @param split Split. + * @return Task number. + */ + public int taskNumber(HadoopInputSplit split) { + return pendingSplits.get(split); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeUuid(out, submitNodeId); + out.writeObject(jobId); + out.writeObject(jobInfo); + out.writeObject(mrPlan); + out.writeObject(pendingSplits); + out.writeObject(pendingReducers); + out.writeObject(phase); + out.writeObject(failCause); + out.writeLong(ver); + out.writeObject(reducersAddrs); + out.writeObject(counters); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + submitNodeId = U.readUuid(in); + jobId = (HadoopJobId)in.readObject(); + jobInfo = (HadoopJobInfo)in.readObject(); + mrPlan = (HadoopMapReducePlan)in.readObject(); + pendingSplits = (Map<HadoopInputSplit,Integer>)in.readObject(); + pendingReducers = (Collection<Integer>)in.readObject(); + phase = (HadoopJobPhase)in.readObject(); + failCause = (Throwable)in.readObject(); + ver = in.readLong(); + reducersAddrs = (Map<Integer, HadoopProcessDescriptor>)in.readObject(); + counters = (HadoopCounters)in.readObject(); + } + + /** {@inheritDoc} */ + public String toString() { + return S.toString(HadoopJobMetadata.class, this, "pendingMaps", pendingSplits.size(), + "pendingReduces", pendingReducers.size(), "failCause", failCause == null ? null : + failCause.getClass().getName()); + } +} \ No newline at end of file