http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsUtils.java deleted file mode 100644 index fa5cbc5..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsUtils.java +++ /dev/null @@ -1,174 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.igfs; - -import java.io.FileNotFoundException; -import java.io.IOException; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.AbstractFileSystem; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.ParentNotDirectoryException; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathExistsException; -import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.igfs.IgfsDirectoryNotEmptyException; -import org.apache.ignite.igfs.IgfsParentNotDirectoryException; -import org.apache.ignite.igfs.IgfsPathAlreadyExistsException; -import org.apache.ignite.igfs.IgfsPathNotFoundException; -import org.jetbrains.annotations.Nullable; - -/** - * Utility constants and methods for IGFS Hadoop file system. - */ -public class HadoopIgfsUtils { - /** Parameter name for endpoint no embed mode flag. */ - public static final String PARAM_IGFS_ENDPOINT_NO_EMBED = "fs.igfs.%s.endpoint.no_embed"; - - /** Parameter name for endpoint no shared memory flag. */ - public static final String PARAM_IGFS_ENDPOINT_NO_LOCAL_SHMEM = "fs.igfs.%s.endpoint.no_local_shmem"; - - /** Parameter name for endpoint no local TCP flag. */ - public static final String PARAM_IGFS_ENDPOINT_NO_LOCAL_TCP = "fs.igfs.%s.endpoint.no_local_tcp"; - - /** - * Get string parameter. - * - * @param cfg Configuration. - * @param name Parameter name. - * @param authority Authority. - * @param dflt Default value. - * @return String value. - */ - public static String parameter(Configuration cfg, String name, String authority, String dflt) { - return cfg.get(String.format(name, authority != null ? authority : ""), dflt); - } - - /** - * Get integer parameter. - * - * @param cfg Configuration. - * @param name Parameter name. - * @param authority Authority. - * @param dflt Default value. - * @return Integer value. - * @throws IOException In case of parse exception. - */ - public static int parameter(Configuration cfg, String name, String authority, int dflt) throws IOException { - String name0 = String.format(name, authority != null ? authority : ""); - - try { - return cfg.getInt(name0, dflt); - } - catch (NumberFormatException ignore) { - throw new IOException("Failed to parse parameter value to integer: " + name0); - } - } - - /** - * Get boolean parameter. - * - * @param cfg Configuration. - * @param name Parameter name. - * @param authority Authority. - * @param dflt Default value. - * @return Boolean value. - */ - public static boolean parameter(Configuration cfg, String name, String authority, boolean dflt) { - return cfg.getBoolean(String.format(name, authority != null ? authority : ""), dflt); - } - - /** - * Cast Ignite exception to appropriate IO exception. - * - * @param e Exception to cast. - * @return Casted exception. - */ - public static IOException cast(IgniteCheckedException e) { - return cast(e, null); - } - - /** - * Cast Ignite exception to appropriate IO exception. - * - * @param e Exception to cast. - * @param path Path for exceptions. - * @return Casted exception. - */ - @SuppressWarnings("unchecked") - public static IOException cast(IgniteCheckedException e, @Nullable String path) { - assert e != null; - - // First check for any nested IOException; if exists - re-throw it. - if (e.hasCause(IOException.class)) - return e.getCause(IOException.class); - else if (e.hasCause(IgfsPathNotFoundException.class)) - return new FileNotFoundException(path); // TODO: Or PathNotFoundException? - else if (e.hasCause(IgfsParentNotDirectoryException.class)) - return new ParentNotDirectoryException(path); - else if (path != null && e.hasCause(IgfsDirectoryNotEmptyException.class)) - return new PathIsNotEmptyDirectoryException(path); - else if (path != null && e.hasCause(IgfsPathAlreadyExistsException.class)) - return new PathExistsException(path); - else { - String msg = e.getMessage(); - - return msg == null ? new IOException(e) : new IOException(msg, e); - } - } - - /** - * Deletes all files from the given file system. - * - * @param fs The file system to clean up. - * @throws IOException On error. - */ - public static void clear(FileSystem fs) throws IOException { - // Delete root contents: - FileStatus[] statuses = fs.listStatus(new Path("/")); - - if (statuses != null) { - for (FileStatus stat: statuses) - fs.delete(stat.getPath(), true); - } - } - - /** - * Deletes all files from the given file system. - * - * @param fs The file system to clean up. - * @throws IOException On error. - */ - public static void clear(AbstractFileSystem fs) throws IOException { - // Delete root contents: - FileStatus[] statuses = fs.listStatus(new Path("/")); - - if (statuses != null) { - for (FileStatus stat: statuses) - fs.delete(stat.getPath(), true); - } - } - - /** - * Constructor. - */ - private HadoopIgfsUtils() { - // No-op. - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java deleted file mode 100644 index f4ee97f..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java +++ /dev/null @@ -1,552 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.igfs; - -import java.io.IOException; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; -import org.apache.commons.logging.Log; -import org.apache.hadoop.conf.Configuration; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteFileSystem; -import org.apache.ignite.IgniteIllegalStateException; -import org.apache.ignite.Ignition; -import org.apache.ignite.igfs.IgfsBlockLocation; -import org.apache.ignite.igfs.IgfsFile; -import org.apache.ignite.igfs.IgfsPath; -import org.apache.ignite.igfs.IgfsPathSummary; -import org.apache.ignite.internal.processors.igfs.IgfsEx; -import org.apache.ignite.internal.processors.igfs.IgfsHandshakeResponse; -import org.apache.ignite.internal.processors.igfs.IgfsStatus; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.X; -import org.apache.ignite.internal.util.typedef.internal.SB; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.jetbrains.annotations.Nullable; - -import static org.apache.ignite.IgniteState.STARTED; -import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint.LOCALHOST; -import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils.PARAM_IGFS_ENDPOINT_NO_EMBED; -import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils.PARAM_IGFS_ENDPOINT_NO_LOCAL_SHMEM; -import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils.PARAM_IGFS_ENDPOINT_NO_LOCAL_TCP; -import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils.parameter; - -/** - * Wrapper for IGFS server. - */ -public class HadoopIgfsWrapper implements HadoopIgfs { - /** Delegate. */ - private final AtomicReference<Delegate> delegateRef = new AtomicReference<>(); - - /** Authority. */ - private final String authority; - - /** Connection string. */ - private final HadoopIgfsEndpoint endpoint; - - /** Log directory. */ - private final String logDir; - - /** Configuration. */ - private final Configuration conf; - - /** Logger. */ - private final Log log; - - /** The user name this wrapper works on behalf of. */ - private final String userName; - - /** - * Constructor. - * - * @param authority Authority (connection string). - * @param logDir Log directory for server. - * @param conf Configuration. - * @param log Current logger. - */ - public HadoopIgfsWrapper(String authority, String logDir, Configuration conf, Log log, String user) - throws IOException { - try { - this.authority = authority; - this.endpoint = new HadoopIgfsEndpoint(authority); - this.logDir = logDir; - this.conf = conf; - this.log = log; - this.userName = user; - } - catch (IgniteCheckedException e) { - throw new IOException("Failed to parse endpoint: " + authority, e); - } - } - - /** {@inheritDoc} */ - @Override public IgfsHandshakeResponse handshake(String logDir) throws IOException { - return withReconnectHandling(new FileSystemClosure<IgfsHandshakeResponse>() { - @Override public IgfsHandshakeResponse apply(HadoopIgfsEx hadoop, - IgfsHandshakeResponse hndResp) { - return hndResp; - } - }); - } - - /** {@inheritDoc} */ - @Override public void close(boolean force) { - Delegate delegate = delegateRef.get(); - - if (delegate != null && delegateRef.compareAndSet(delegate, null)) - delegate.close(force); - } - - /** {@inheritDoc} */ - @Override public IgfsFile info(final IgfsPath path) throws IOException { - return withReconnectHandling(new FileSystemClosure<IgfsFile>() { - @Override public IgfsFile apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) - throws IgniteCheckedException, IOException { - return hadoop.info(path); - } - }, path); - } - - /** {@inheritDoc} */ - @Override public IgfsFile update(final IgfsPath path, final Map<String, String> props) throws IOException { - return withReconnectHandling(new FileSystemClosure<IgfsFile>() { - @Override public IgfsFile apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) - throws IgniteCheckedException, IOException { - return hadoop.update(path, props); - } - }, path); - } - - /** {@inheritDoc} */ - @Override public Boolean setTimes(final IgfsPath path, final long accessTime, final long modificationTime) - throws IOException { - return withReconnectHandling(new FileSystemClosure<Boolean>() { - @Override public Boolean apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) - throws IgniteCheckedException, IOException { - return hadoop.setTimes(path, accessTime, modificationTime); - } - }, path); - } - - /** {@inheritDoc} */ - @Override public Boolean rename(final IgfsPath src, final IgfsPath dest) throws IOException { - return withReconnectHandling(new FileSystemClosure<Boolean>() { - @Override public Boolean apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) - throws IgniteCheckedException, IOException { - return hadoop.rename(src, dest); - } - }, src); - } - - /** {@inheritDoc} */ - @Override public Boolean delete(final IgfsPath path, final boolean recursive) throws IOException { - return withReconnectHandling(new FileSystemClosure<Boolean>() { - @Override public Boolean apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) - throws IgniteCheckedException, IOException { - return hadoop.delete(path, recursive); - } - }, path); - } - - /** {@inheritDoc} */ - @Override public Collection<IgfsBlockLocation> affinity(final IgfsPath path, final long start, - final long len) throws IOException { - return withReconnectHandling(new FileSystemClosure<Collection<IgfsBlockLocation>>() { - @Override public Collection<IgfsBlockLocation> apply(HadoopIgfsEx hadoop, - IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { - return hadoop.affinity(path, start, len); - } - }, path); - } - - /** {@inheritDoc} */ - @Override public IgfsPathSummary contentSummary(final IgfsPath path) throws IOException { - return withReconnectHandling(new FileSystemClosure<IgfsPathSummary>() { - @Override public IgfsPathSummary apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) - throws IgniteCheckedException, IOException { - return hadoop.contentSummary(path); - } - }, path); - } - - /** {@inheritDoc} */ - @Override public Boolean mkdirs(final IgfsPath path, final Map<String, String> props) throws IOException { - return withReconnectHandling(new FileSystemClosure<Boolean>() { - @Override public Boolean apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) - throws IgniteCheckedException, IOException { - return hadoop.mkdirs(path, props); - } - }, path); - } - - /** {@inheritDoc} */ - @Override public Collection<IgfsFile> listFiles(final IgfsPath path) throws IOException { - return withReconnectHandling(new FileSystemClosure<Collection<IgfsFile>>() { - @Override public Collection<IgfsFile> apply(HadoopIgfsEx hadoop, - IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { - return hadoop.listFiles(path); - } - }, path); - } - - /** {@inheritDoc} */ - @Override public Collection<IgfsPath> listPaths(final IgfsPath path) throws IOException { - return withReconnectHandling(new FileSystemClosure<Collection<IgfsPath>>() { - @Override public Collection<IgfsPath> apply(HadoopIgfsEx hadoop, - IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { - return hadoop.listPaths(path); - } - }, path); - } - - /** {@inheritDoc} */ - @Override public IgfsStatus fsStatus() throws IOException { - return withReconnectHandling(new FileSystemClosure<IgfsStatus>() { - @Override public IgfsStatus apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) - throws IgniteCheckedException, IOException { - return hadoop.fsStatus(); - } - }); - } - - /** {@inheritDoc} */ - @Override public HadoopIgfsStreamDelegate open(final IgfsPath path) throws IOException { - return withReconnectHandling(new FileSystemClosure<HadoopIgfsStreamDelegate>() { - @Override public HadoopIgfsStreamDelegate apply(HadoopIgfsEx hadoop, - IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { - return hadoop.open(path); - } - }, path); - } - - /** {@inheritDoc} */ - @Override public HadoopIgfsStreamDelegate open(final IgfsPath path, final int seqReadsBeforePrefetch) - throws IOException { - return withReconnectHandling(new FileSystemClosure<HadoopIgfsStreamDelegate>() { - @Override public HadoopIgfsStreamDelegate apply(HadoopIgfsEx hadoop, - IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { - return hadoop.open(path, seqReadsBeforePrefetch); - } - }, path); - } - - /** {@inheritDoc} */ - @Override public HadoopIgfsStreamDelegate create(final IgfsPath path, final boolean overwrite, - final boolean colocate, final int replication, final long blockSize, @Nullable final Map<String, String> props) - throws IOException { - return withReconnectHandling(new FileSystemClosure<HadoopIgfsStreamDelegate>() { - @Override public HadoopIgfsStreamDelegate apply(HadoopIgfsEx hadoop, - IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { - return hadoop.create(path, overwrite, colocate, replication, blockSize, props); - } - }, path); - } - - /** {@inheritDoc} */ - @Override public HadoopIgfsStreamDelegate append(final IgfsPath path, final boolean create, - @Nullable final Map<String, String> props) throws IOException { - return withReconnectHandling(new FileSystemClosure<HadoopIgfsStreamDelegate>() { - @Override public HadoopIgfsStreamDelegate apply(HadoopIgfsEx hadoop, - IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { - return hadoop.append(path, create, props); - } - }, path); - } - - /** - * Execute closure which is not path-specific. - * - * @param clo Closure. - * @return Result. - * @throws IOException If failed. - */ - private <T> T withReconnectHandling(FileSystemClosure<T> clo) throws IOException { - return withReconnectHandling(clo, null); - } - - /** - * Execute closure. - * - * @param clo Closure. - * @param path Path for exceptions. - * @return Result. - * @throws IOException If failed. - */ - private <T> T withReconnectHandling(final FileSystemClosure<T> clo, @Nullable IgfsPath path) - throws IOException { - Exception err = null; - - for (int i = 0; i < 2; i++) { - Delegate curDelegate = null; - - boolean close = false; - boolean force = false; - - try { - curDelegate = delegate(); - - assert curDelegate != null; - - close = curDelegate.doomed; - - return clo.apply(curDelegate.hadoop, curDelegate.hndResp); - } - catch (HadoopIgfsCommunicationException e) { - if (curDelegate != null && !curDelegate.doomed) { - // Try getting rid fo faulty delegate ASAP. - delegateRef.compareAndSet(curDelegate, null); - - close = true; - force = true; - } - - if (log.isDebugEnabled()) - log.debug("Failed to send message to a server: " + e); - - err = e; - } - catch (IgniteCheckedException e) { - throw HadoopIgfsUtils.cast(e, path != null ? path.toString() : null); - } - finally { - if (close) { - assert curDelegate != null; - - curDelegate.close(force); - } - } - } - - List<Throwable> list = X.getThrowableList(err); - - Throwable cause = list.get(list.size() - 1); - - throw new IOException("Failed to communicate with IGFS: " - + (cause.getMessage() == null ? cause.toString() : cause.getMessage()), err); - } - - /** - * Get delegate creating it if needed. - * - * @return Delegate. - */ - private Delegate delegate() throws HadoopIgfsCommunicationException { - // These fields will contain possible exceptions from shmem and TCP endpoints. - Exception errShmem = null; - Exception errTcp = null; - - // 1. If delegate is set, return it immediately. - Delegate curDelegate = delegateRef.get(); - - if (curDelegate != null) - return curDelegate; - - // 2. Guess that we are in the same VM. - boolean skipInProc = parameter(conf, PARAM_IGFS_ENDPOINT_NO_EMBED, authority, false); - - if (!skipInProc) { - IgfsEx igfs = getIgfsEx(endpoint.grid(), endpoint.igfs()); - - if (igfs != null) { - HadoopIgfsEx hadoop = null; - - try { - hadoop = new HadoopIgfsInProc(igfs, log, userName); - - curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); - } - catch (IOException | IgniteCheckedException e) { - if (e instanceof HadoopIgfsCommunicationException) - if (hadoop != null) - hadoop.close(true); - - if (log.isDebugEnabled()) - log.debug("Failed to connect to in-process IGFS, fallback to IPC mode.", e); - } - } - } - - // 3. Try connecting using shmem. - boolean skipLocShmem = parameter(conf, PARAM_IGFS_ENDPOINT_NO_LOCAL_SHMEM, authority, false); - - if (curDelegate == null && !skipLocShmem && !U.isWindows()) { - HadoopIgfsEx hadoop = null; - - try { - hadoop = new HadoopIgfsOutProc(endpoint.port(), endpoint.grid(), endpoint.igfs(), log, userName); - - curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); - } - catch (IOException | IgniteCheckedException e) { - if (e instanceof HadoopIgfsCommunicationException) - hadoop.close(true); - - if (log.isDebugEnabled()) - log.debug("Failed to connect to IGFS using shared memory [port=" + endpoint.port() + ']', e); - - errShmem = e; - } - } - - // 4. Try local TCP connection. - boolean skipLocTcp = parameter(conf, PARAM_IGFS_ENDPOINT_NO_LOCAL_TCP, authority, false); - - if (curDelegate == null && !skipLocTcp) { - HadoopIgfsEx hadoop = null; - - try { - hadoop = new HadoopIgfsOutProc(LOCALHOST, endpoint.port(), endpoint.grid(), endpoint.igfs(), - log, userName); - - curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); - } - catch (IOException | IgniteCheckedException e) { - if (e instanceof HadoopIgfsCommunicationException) - hadoop.close(true); - - if (log.isDebugEnabled()) - log.debug("Failed to connect to IGFS using TCP [host=" + endpoint.host() + - ", port=" + endpoint.port() + ']', e); - - errTcp = e; - } - } - - // 5. Try remote TCP connection. - if (curDelegate == null && (skipLocTcp || !F.eq(LOCALHOST, endpoint.host()))) { - HadoopIgfsEx hadoop = null; - - try { - hadoop = new HadoopIgfsOutProc(endpoint.host(), endpoint.port(), endpoint.grid(), endpoint.igfs(), - log, userName); - - curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); - } - catch (IOException | IgniteCheckedException e) { - if (e instanceof HadoopIgfsCommunicationException) - hadoop.close(true); - - if (log.isDebugEnabled()) - log.debug("Failed to connect to IGFS using TCP [host=" + endpoint.host() + - ", port=" + endpoint.port() + ']', e); - - errTcp = e; - } - } - - if (curDelegate != null) { - if (!delegateRef.compareAndSet(null, curDelegate)) - curDelegate.doomed = true; - - return curDelegate; - } - else { - SB errMsg = new SB("Failed to connect to IGFS [endpoint=igfs://" + authority + ", attempts=["); - - if (errShmem != null) - errMsg.a("[type=SHMEM, port=" + endpoint.port() + ", err=" + errShmem + "], "); - - errMsg.a("[type=TCP, host=" + endpoint.host() + ", port=" + endpoint.port() + ", err=" + errTcp + "]] "); - - errMsg.a("(ensure that IGFS is running and have IPC endpoint enabled; ensure that " + - "ignite-shmem-1.0.0.jar is in Hadoop classpath if you use shared memory endpoint)."); - - throw new HadoopIgfsCommunicationException(errMsg.toString()); - } - } - - /** - * File system operation closure. - */ - private static interface FileSystemClosure<T> { - /** - * Call closure body. - * - * @param hadoop RPC handler. - * @param hndResp Handshake response. - * @return Result. - * @throws IgniteCheckedException If failed. - * @throws IOException If failed. - */ - public T apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException; - } - - /** - * Delegate. - */ - private static class Delegate { - /** RPC handler. */ - private final HadoopIgfsEx hadoop; - - /** Handshake request. */ - private final IgfsHandshakeResponse hndResp; - - /** Close guard. */ - private final AtomicBoolean closeGuard = new AtomicBoolean(); - - /** Whether this delegate must be closed at the end of the next invocation. */ - private boolean doomed; - - /** - * Constructor. - * - * @param hadoop Hadoop. - * @param hndResp Handshake response. - */ - private Delegate(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) { - this.hadoop = hadoop; - this.hndResp = hndResp; - } - - /** - * Close underlying RPC handler. - * - * @param force Force flag. - */ - private void close(boolean force) { - if (closeGuard.compareAndSet(false, true)) - hadoop.close(force); - } - } - - /** - * Helper method to find Igfs of the given name in the given Ignite instance. - * - * @param gridName The name of the grid to check. - * @param igfsName The name of Igfs. - * @return The file system instance, or null if not found. - */ - private static IgfsEx getIgfsEx(@Nullable String gridName, @Nullable String igfsName) { - if (Ignition.state(gridName) == STARTED) { - try { - for (IgniteFileSystem fs : Ignition.ignite(gridName).fileSystems()) { - if (F.eq(fs.name(), igfsName)) - return (IgfsEx)fs; - } - } - catch (IgniteIllegalStateException ignore) { - // May happen if the grid state has changed: - } - } - - return null; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java deleted file mode 100644 index 090b336..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java +++ /dev/null @@ -1,316 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.jobtracker; - -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.util.Collection; -import java.util.Map; -import java.util.UUID; -import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit; -import org.apache.ignite.internal.processors.hadoop.HadoopJobId; -import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo; -import org.apache.ignite.internal.processors.hadoop.HadoopJobPhase; -import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan; -import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; -import org.apache.ignite.internal.processors.hadoop.counter.HadoopCountersImpl; -import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.HadoopProcessDescriptor; -import org.apache.ignite.internal.util.tostring.GridToStringExclude; -import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; - -import static org.apache.ignite.internal.processors.hadoop.HadoopJobPhase.PHASE_SETUP; - -/** - * Hadoop job metadata. Internal object used for distributed job state tracking. - */ -public class HadoopJobMetadata implements Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Job ID. */ - private HadoopJobId jobId; - - /** Job info. */ - private HadoopJobInfo jobInfo; - - /** Node submitted job. */ - private UUID submitNodeId; - - /** Map-reduce plan. */ - private HadoopMapReducePlan mrPlan; - - /** Pending splits for which mapper should be executed. */ - private Map<HadoopInputSplit, Integer> pendingSplits; - - /** Pending reducers. */ - private Collection<Integer> pendingReducers; - - /** Reducers addresses. */ - @GridToStringInclude - private Map<Integer, HadoopProcessDescriptor> reducersAddrs; - - /** Job phase. */ - private HadoopJobPhase phase = PHASE_SETUP; - - /** Fail cause. */ - @GridToStringExclude - private Throwable failCause; - - /** Version. */ - private long ver; - - /** Job counters */ - private HadoopCounters counters = new HadoopCountersImpl(); - - /** - * Empty constructor required by {@link Externalizable}. - */ - public HadoopJobMetadata() { - // No-op. - } - - /** - * Constructor. - * - * @param submitNodeId Submit node ID. - * @param jobId Job ID. - * @param jobInfo Job info. - */ - public HadoopJobMetadata(UUID submitNodeId, HadoopJobId jobId, HadoopJobInfo jobInfo) { - this.jobId = jobId; - this.jobInfo = jobInfo; - this.submitNodeId = submitNodeId; - } - - /** - * Copy constructor. - * - * @param src Metadata to copy. - */ - public HadoopJobMetadata(HadoopJobMetadata src) { - // Make sure to preserve alphabetic order. - counters = src.counters; - failCause = src.failCause; - jobId = src.jobId; - jobInfo = src.jobInfo; - mrPlan = src.mrPlan; - pendingSplits = src.pendingSplits; - pendingReducers = src.pendingReducers; - phase = src.phase; - reducersAddrs = src.reducersAddrs; - submitNodeId = src.submitNodeId; - ver = src.ver + 1; - } - - /** - * @return Submit node ID. - */ - public UUID submitNodeId() { - return submitNodeId; - } - - /** - * @param phase Job phase. - */ - public void phase(HadoopJobPhase phase) { - this.phase = phase; - } - - /** - * @return Job phase. - */ - public HadoopJobPhase phase() { - return phase; - } - - /** - * Gets reducers addresses for external execution. - * - * @return Reducers addresses. - */ - public Map<Integer, HadoopProcessDescriptor> reducersAddresses() { - return reducersAddrs; - } - - /** - * Sets reducers addresses for external execution. - * - * @param reducersAddrs Map of addresses. - */ - public void reducersAddresses(Map<Integer, HadoopProcessDescriptor> reducersAddrs) { - this.reducersAddrs = reducersAddrs; - } - - /** - * Sets collection of pending splits. - * - * @param pendingSplits Collection of pending splits. - */ - public void pendingSplits(Map<HadoopInputSplit, Integer> pendingSplits) { - this.pendingSplits = pendingSplits; - } - - /** - * Gets collection of pending splits. - * - * @return Collection of pending splits. - */ - public Map<HadoopInputSplit, Integer> pendingSplits() { - return pendingSplits; - } - - /** - * Sets collection of pending reducers. - * - * @param pendingReducers Collection of pending reducers. - */ - public void pendingReducers(Collection<Integer> pendingReducers) { - this.pendingReducers = pendingReducers; - } - - /** - * Gets collection of pending reducers. - * - * @return Collection of pending reducers. - */ - public Collection<Integer> pendingReducers() { - return pendingReducers; - } - - /** - * @return Job ID. - */ - public HadoopJobId jobId() { - return jobId; - } - - /** - * @param mrPlan Map-reduce plan. - */ - public void mapReducePlan(HadoopMapReducePlan mrPlan) { - assert this.mrPlan == null : "Map-reduce plan can only be initialized once."; - - this.mrPlan = mrPlan; - } - - /** - * @return Map-reduce plan. - */ - public HadoopMapReducePlan mapReducePlan() { - return mrPlan; - } - - /** - * @return Job info. - */ - public HadoopJobInfo jobInfo() { - return jobInfo; - } - - /** - * Returns job counters. - * - * @return Collection of counters. - */ - public HadoopCounters counters() { - return counters; - } - - /** - * Sets counters. - * - * @param counters Collection of counters. - */ - public void counters(HadoopCounters counters) { - this.counters = counters; - } - - /** - * @param failCause Fail cause. - */ - public void failCause(Throwable failCause) { - assert failCause != null; - - if (this.failCause == null) // Keep the first error. - this.failCause = failCause; - } - - /** - * @return Fail cause. - */ - public Throwable failCause() { - return failCause; - } - - /** - * @return Version. - */ - public long version() { - return ver; - } - - /** - * @param split Split. - * @return Task number. - */ - public int taskNumber(HadoopInputSplit split) { - return pendingSplits.get(split); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeUuid(out, submitNodeId); - out.writeObject(jobId); - out.writeObject(jobInfo); - out.writeObject(mrPlan); - out.writeObject(pendingSplits); - out.writeObject(pendingReducers); - out.writeObject(phase); - out.writeObject(failCause); - out.writeLong(ver); - out.writeObject(reducersAddrs); - out.writeObject(counters); - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - submitNodeId = U.readUuid(in); - jobId = (HadoopJobId)in.readObject(); - jobInfo = (HadoopJobInfo)in.readObject(); - mrPlan = (HadoopMapReducePlan)in.readObject(); - pendingSplits = (Map<HadoopInputSplit,Integer>)in.readObject(); - pendingReducers = (Collection<Integer>)in.readObject(); - phase = (HadoopJobPhase)in.readObject(); - failCause = (Throwable)in.readObject(); - ver = in.readLong(); - reducersAddrs = (Map<Integer, HadoopProcessDescriptor>)in.readObject(); - counters = (HadoopCounters)in.readObject(); - } - - /** {@inheritDoc} */ - public String toString() { - return S.toString(HadoopJobMetadata.class, this, "pendingMaps", pendingSplits.size(), - "pendingReduces", pendingReducers.size(), "failCause", failCause == null ? null : - failCause.getClass().getName()); - } -} \ No newline at end of file