Repository: incubator-ignite Updated Branches: refs/heads/ignite-226 4daa02139 -> 9cf92dd53
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopStreamDelegate.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopStreamDelegate.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopStreamDelegate.java new file mode 100644 index 0000000..fdf5a0a --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopStreamDelegate.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.igfs.hadoop; + +import org.apache.ignite.internal.util.typedef.internal.*; + +/** + * GGFS Hadoop stream descriptor. + */ +public class IgfsHadoopStreamDelegate { + /** RPC handler. */ + private final IgfsHadoopEx hadoop; + + /** Target. */ + private final Object target; + + /** Optional stream length. */ + private final long len; + + /** + * Constructor. + * + * @param target Target. + */ + public IgfsHadoopStreamDelegate(IgfsHadoopEx hadoop, Object target) { + this(hadoop, target, -1); + } + + /** + * Constructor. + * + * @param target Target. + * @param len Optional length. + */ + public IgfsHadoopStreamDelegate(IgfsHadoopEx hadoop, Object target, long len) { + assert hadoop != null; + assert target != null; + + this.hadoop = hadoop; + this.target = target; + this.len = len; + } + + /** + * @return RPC handler. + */ + public IgfsHadoopEx hadoop() { + return hadoop; + } + + /** + * @return Stream target. + */ + @SuppressWarnings("unchecked") + public <T> T target() { + return (T) target; + } + + /** + * @return Length. + */ + public long length() { + return len; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return System.identityHashCode(target); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + return obj != null && obj instanceof IgfsHadoopStreamDelegate && + target == ((IgfsHadoopStreamDelegate)obj).target; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgfsHadoopStreamDelegate.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopStreamEventListener.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopStreamEventListener.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopStreamEventListener.java new file mode 100644 index 0000000..4137685 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopStreamEventListener.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.igfs.hadoop; + +import org.apache.ignite.*; + +/** + * GGFS input stream event listener. + */ +public interface IgfsHadoopStreamEventListener { + /** + * Callback invoked when the stream is being closed. + * + * @throws IgniteCheckedException If failed. + */ + public void onClose() throws IgniteCheckedException; + + /** + * Callback invoked when remote error occurs. + * + * @param errMsg Error message. + */ + public void onError(String errMsg); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopUtils.java new file mode 100644 index 0000000..728e3c2 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopUtils.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.igfs.hadoop; + +import org.apache.hadoop.conf.*; +import org.apache.hadoop.fs.*; +import org.apache.ignite.*; +import org.apache.ignite.igfs.*; +import org.apache.ignite.internal.processors.fs.*; +import org.jetbrains.annotations.*; + +import java.io.*; + +/** + * Utility constants and methods for GGFS Hadoop file system. + */ +public class IgfsHadoopUtils { + /** Parameter name for endpoint no embed mode flag. */ + public static final String PARAM_GGFS_ENDPOINT_NO_EMBED = "fs.ggfs.%s.endpoint.no_embed"; + + /** Parameter name for endpoint no shared memory flag. */ + public static final String PARAM_GGFS_ENDPOINT_NO_LOCAL_SHMEM = "fs.ggfs.%s.endpoint.no_local_shmem"; + + /** Parameter name for endpoint no local TCP flag. */ + public static final String PARAM_GGFS_ENDPOINT_NO_LOCAL_TCP = "fs.ggfs.%s.endpoint.no_local_tcp"; + + /** + * Get string parameter. + * + * @param cfg Configuration. + * @param name Parameter name. + * @param authority Authority. + * @param dflt Default value. + * @return String value. + */ + public static String parameter(Configuration cfg, String name, String authority, String dflt) { + return cfg.get(String.format(name, authority != null ? authority : ""), dflt); + } + + /** + * Get integer parameter. + * + * @param cfg Configuration. + * @param name Parameter name. + * @param authority Authority. + * @param dflt Default value. + * @return Integer value. + * @throws IOException In case of parse exception. + */ + public static int parameter(Configuration cfg, String name, String authority, int dflt) throws IOException { + String name0 = String.format(name, authority != null ? authority : ""); + + try { + return cfg.getInt(name0, dflt); + } + catch (NumberFormatException ignore) { + throw new IOException("Failed to parse parameter value to integer: " + name0); + } + } + + /** + * Get boolean parameter. + * + * @param cfg Configuration. + * @param name Parameter name. + * @param authority Authority. + * @param dflt Default value. + * @return Boolean value. + */ + public static boolean parameter(Configuration cfg, String name, String authority, boolean dflt) { + return cfg.getBoolean(String.format(name, authority != null ? authority : ""), dflt); + } + + /** + * Cast GG exception to appropriate IO exception. + * + * @param e Exception to cast. + * @return Casted exception. + */ + public static IOException cast(IgniteCheckedException e) { + return cast(e, null); + } + + /** + * Cast GG exception to appropriate IO exception. + * + * @param e Exception to cast. + * @param path Path for exceptions. + * @return Casted exception. + */ + @SuppressWarnings("unchecked") + public static IOException cast(IgniteCheckedException e, @Nullable String path) { + assert e != null; + + // First check for any nested IOException; if exists - re-throw it. + if (e.hasCause(IOException.class)) + return e.getCause(IOException.class); + else if (e.hasCause(IgfsFileNotFoundException.class)) + return new FileNotFoundException(path); // TODO: Or PathNotFoundException? + else if (e.hasCause(IgfsParentNotDirectoryException.class)) + return new ParentNotDirectoryException(path); + else if (path != null && e.hasCause(IgfsDirectoryNotEmptyException.class)) + return new PathIsNotEmptyDirectoryException(path); + else if (path != null && e.hasCause(IgfsPathAlreadyExistsException.class)) + return new PathExistsException(path); + else + return new IOException(e); + } + + /** + * Constructor. + */ + private IgfsHadoopUtils() { + // No-op. + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopWrapper.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopWrapper.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopWrapper.java new file mode 100644 index 0000000..d72a4aa --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopWrapper.java @@ -0,0 +1,511 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.igfs.hadoop; + +import org.apache.commons.logging.*; +import org.apache.hadoop.conf.*; +import org.apache.ignite.*; +import org.apache.ignite.igfs.*; +import org.apache.ignite.internal.processors.fs.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.internal.igfs.hadoop.IgfsHadoopEndpoint.*; +import static org.apache.ignite.internal.igfs.hadoop.IgfsHadoopUtils.*; + +/** + * Wrapper for GGFS server. + */ +public class IgfsHadoopWrapper implements IgfsHadoop { + /** Delegate. */ + private final AtomicReference<Delegate> delegateRef = new AtomicReference<>(); + + /** Authority. */ + private final String authority; + + /** Connection string. */ + private final IgfsHadoopEndpoint endpoint; + + /** Log directory. */ + private final String logDir; + + /** Configuration. */ + private final Configuration conf; + + /** Logger. */ + private final Log log; + + /** + * Constructor. + * + * @param authority Authority (connection string). + * @param logDir Log directory for server. + * @param conf Configuration. + * @param log Current logger. + */ + public IgfsHadoopWrapper(String authority, String logDir, Configuration conf, Log log) throws IOException { + try { + this.authority = authority; + this.endpoint = new IgfsHadoopEndpoint(authority); + this.logDir = logDir; + this.conf = conf; + this.log = log; + } + catch (IgniteCheckedException e) { + throw new IOException("Failed to parse endpoint: " + authority, e); + } + } + + /** {@inheritDoc} */ + @Override public IgfsHandshakeResponse handshake(String logDir) throws IOException { + return withReconnectHandling(new FileSystemClosure<IgfsHandshakeResponse>() { + @Override public IgfsHandshakeResponse apply(IgfsHadoopEx hadoop, + IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { + return hndResp; + } + }); + } + + /** {@inheritDoc} */ + @Override public void close(boolean force) { + Delegate delegate = delegateRef.get(); + + if (delegate != null && delegateRef.compareAndSet(delegate, null)) + delegate.close(force); + } + + /** {@inheritDoc} */ + @Override public IgfsFile info(final IgfsPath path) throws IOException { + return withReconnectHandling(new FileSystemClosure<IgfsFile>() { + @Override public IgfsFile apply(IgfsHadoopEx hadoop, IgfsHandshakeResponse hndResp) + throws IgniteCheckedException, IOException { + return hadoop.info(path); + } + }, path); + } + + /** {@inheritDoc} */ + @Override public IgfsFile update(final IgfsPath path, final Map<String, String> props) throws IOException { + return withReconnectHandling(new FileSystemClosure<IgfsFile>() { + @Override public IgfsFile apply(IgfsHadoopEx hadoop, IgfsHandshakeResponse hndResp) + throws IgniteCheckedException, IOException { + return hadoop.update(path, props); + } + }, path); + } + + /** {@inheritDoc} */ + @Override public Boolean setTimes(final IgfsPath path, final long accessTime, final long modificationTime) + throws IOException { + return withReconnectHandling(new FileSystemClosure<Boolean>() { + @Override public Boolean apply(IgfsHadoopEx hadoop, IgfsHandshakeResponse hndResp) + throws IgniteCheckedException, IOException { + return hadoop.setTimes(path, accessTime, modificationTime); + } + }, path); + } + + /** {@inheritDoc} */ + @Override public Boolean rename(final IgfsPath src, final IgfsPath dest) throws IOException { + return withReconnectHandling(new FileSystemClosure<Boolean>() { + @Override public Boolean apply(IgfsHadoopEx hadoop, IgfsHandshakeResponse hndResp) + throws IgniteCheckedException, IOException { + return hadoop.rename(src, dest); + } + }, src); + } + + /** {@inheritDoc} */ + @Override public Boolean delete(final IgfsPath path, final boolean recursive) throws IOException { + return withReconnectHandling(new FileSystemClosure<Boolean>() { + @Override public Boolean apply(IgfsHadoopEx hadoop, IgfsHandshakeResponse hndResp) + throws IgniteCheckedException, IOException { + return hadoop.delete(path, recursive); + } + }, path); + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsBlockLocation> affinity(final IgfsPath path, final long start, + final long len) throws IOException { + return withReconnectHandling(new FileSystemClosure<Collection<IgfsBlockLocation>>() { + @Override public Collection<IgfsBlockLocation> apply(IgfsHadoopEx hadoop, + IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { + return hadoop.affinity(path, start, len); + } + }, path); + } + + /** {@inheritDoc} */ + @Override public IgfsPathSummary contentSummary(final IgfsPath path) throws IOException { + return withReconnectHandling(new FileSystemClosure<IgfsPathSummary>() { + @Override public IgfsPathSummary apply(IgfsHadoopEx hadoop, IgfsHandshakeResponse hndResp) + throws IgniteCheckedException, IOException { + return hadoop.contentSummary(path); + } + }, path); + } + + /** {@inheritDoc} */ + @Override public Boolean mkdirs(final IgfsPath path, final Map<String, String> props) throws IOException { + return withReconnectHandling(new FileSystemClosure<Boolean>() { + @Override public Boolean apply(IgfsHadoopEx hadoop, IgfsHandshakeResponse hndResp) + throws IgniteCheckedException, IOException { + return hadoop.mkdirs(path, props); + } + }, path); + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsFile> listFiles(final IgfsPath path) throws IOException { + return withReconnectHandling(new FileSystemClosure<Collection<IgfsFile>>() { + @Override public Collection<IgfsFile> apply(IgfsHadoopEx hadoop, + IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { + return hadoop.listFiles(path); + } + }, path); + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsPath> listPaths(final IgfsPath path) throws IOException { + return withReconnectHandling(new FileSystemClosure<Collection<IgfsPath>>() { + @Override public Collection<IgfsPath> apply(IgfsHadoopEx hadoop, + IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { + return hadoop.listPaths(path); + } + }, path); + } + + /** {@inheritDoc} */ + @Override public IgfsStatus fsStatus() throws IOException { + return withReconnectHandling(new FileSystemClosure<IgfsStatus>() { + @Override public IgfsStatus apply(IgfsHadoopEx hadoop, IgfsHandshakeResponse hndResp) + throws IgniteCheckedException, IOException { + return hadoop.fsStatus(); + } + }); + } + + /** {@inheritDoc} */ + @Override public IgfsHadoopStreamDelegate open(final IgfsPath path) throws IOException { + return withReconnectHandling(new FileSystemClosure<IgfsHadoopStreamDelegate>() { + @Override public IgfsHadoopStreamDelegate apply(IgfsHadoopEx hadoop, + IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { + return hadoop.open(path); + } + }, path); + } + + /** {@inheritDoc} */ + @Override public IgfsHadoopStreamDelegate open(final IgfsPath path, final int seqReadsBeforePrefetch) + throws IOException { + return withReconnectHandling(new FileSystemClosure<IgfsHadoopStreamDelegate>() { + @Override public IgfsHadoopStreamDelegate apply(IgfsHadoopEx hadoop, + IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { + return hadoop.open(path, seqReadsBeforePrefetch); + } + }, path); + } + + /** {@inheritDoc} */ + @Override public IgfsHadoopStreamDelegate create(final IgfsPath path, final boolean overwrite, + final boolean colocate, final int replication, final long blockSize, @Nullable final Map<String, String> props) + throws IOException { + return withReconnectHandling(new FileSystemClosure<IgfsHadoopStreamDelegate>() { + @Override public IgfsHadoopStreamDelegate apply(IgfsHadoopEx hadoop, + IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { + return hadoop.create(path, overwrite, colocate, replication, blockSize, props); + } + }, path); + } + + /** {@inheritDoc} */ + @Override public IgfsHadoopStreamDelegate append(final IgfsPath path, final boolean create, + @Nullable final Map<String, String> props) throws IOException { + return withReconnectHandling(new FileSystemClosure<IgfsHadoopStreamDelegate>() { + @Override public IgfsHadoopStreamDelegate apply(IgfsHadoopEx hadoop, + IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { + return hadoop.append(path, create, props); + } + }, path); + } + + /** + * Execute closure which is not path-specific. + * + * @param clo Closure. + * @return Result. + * @throws IOException If failed. + */ + private <T> T withReconnectHandling(FileSystemClosure<T> clo) throws IOException { + return withReconnectHandling(clo, null); + } + + /** + * Execute closure. + * + * @param clo Closure. + * @param path Path for exceptions. + * @return Result. + * @throws IOException If failed. + */ + private <T> T withReconnectHandling(final FileSystemClosure<T> clo, @Nullable IgfsPath path) + throws IOException { + Exception err = null; + + for (int i = 0; i < 2; i++) { + Delegate curDelegate = null; + + boolean close = false; + boolean force = false; + + try { + curDelegate = delegate(); + + assert curDelegate != null; + + close = curDelegate.doomed; + + return clo.apply(curDelegate.hadoop, curDelegate.hndResp); + } + catch (IgfsHadoopCommunicationException e) { + if (curDelegate != null && !curDelegate.doomed) { + // Try getting rid fo faulty delegate ASAP. + delegateRef.compareAndSet(curDelegate, null); + + close = true; + force = true; + } + + if (log.isDebugEnabled()) + log.debug("Failed to send message to a server: " + e); + + err = e; + } + catch (IgniteCheckedException e) { + throw IgfsHadoopUtils.cast(e, path != null ? path.toString() : null); + } + finally { + if (close) { + assert curDelegate != null; + + curDelegate.close(force); + } + } + } + + throw new IOException("Failed to communicate with GGFS.", err); + } + + /** + * Get delegate creating it if needed. + * + * @return Delegate. + */ + private Delegate delegate() throws IgfsHadoopCommunicationException { + Exception err = null; + + // 1. If delegate is set, return it immediately. + Delegate curDelegate = delegateRef.get(); + + if (curDelegate != null) + return curDelegate; + + // 2. Guess that we are in the same VM. + if (!parameter(conf, PARAM_GGFS_ENDPOINT_NO_EMBED, authority, false)) { + IgfsEx ggfs = null; + + if (endpoint.grid() == null) { + try { + Ignite ignite = G.ignite(); + + ggfs = (IgfsEx)ignite.fileSystem(endpoint.ggfs()); + } + catch (Exception e) { + err = e; + } + } + else { + for (Ignite ignite : G.allGrids()) { + try { + ggfs = (IgfsEx)ignite.fileSystem(endpoint.ggfs()); + + break; + } + catch (Exception e) { + err = e; + } + } + } + + if (ggfs != null) { + IgfsHadoopEx hadoop = null; + + try { + hadoop = new IgfsHadoopInProc(ggfs, log); + + curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); + } + catch (IOException | IgniteCheckedException e) { + if (e instanceof IgfsHadoopCommunicationException) + hadoop.close(true); + + if (log.isDebugEnabled()) + log.debug("Failed to connect to in-proc GGFS, fallback to IPC mode.", e); + + err = e; + } + } + } + + // 3. Try connecting using shmem. + if (!parameter(conf, PARAM_GGFS_ENDPOINT_NO_LOCAL_SHMEM, authority, false)) { + if (curDelegate == null && !U.isWindows()) { + IgfsHadoopEx hadoop = null; + + try { + hadoop = new IgfsHadoopOutProc(endpoint.port(), endpoint.grid(), endpoint.ggfs(), log); + + curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); + } + catch (IOException | IgniteCheckedException e) { + if (e instanceof IgfsHadoopCommunicationException) + hadoop.close(true); + + if (log.isDebugEnabled()) + log.debug("Failed to connect to out-proc local GGFS using shmem.", e); + + err = e; + } + } + } + + // 4. Try local TCP connection. + boolean skipLocTcp = parameter(conf, PARAM_GGFS_ENDPOINT_NO_LOCAL_TCP, authority, false); + + if (!skipLocTcp) { + if (curDelegate == null) { + IgfsHadoopEx hadoop = null; + + try { + hadoop = new IgfsHadoopOutProc(LOCALHOST, endpoint.port(), endpoint.grid(), endpoint.ggfs(), + log); + + curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); + } + catch (IOException | IgniteCheckedException e) { + if (e instanceof IgfsHadoopCommunicationException) + hadoop.close(true); + + if (log.isDebugEnabled()) + log.debug("Failed to connect to out-proc local GGFS using TCP.", e); + + err = e; + } + } + } + + // 5. Try remote TCP connection. + if (curDelegate == null && (skipLocTcp || !F.eq(LOCALHOST, endpoint.host()))) { + IgfsHadoopEx hadoop = null; + + try { + hadoop = new IgfsHadoopOutProc(endpoint.host(), endpoint.port(), endpoint.grid(), endpoint.ggfs(), log); + + curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); + } + catch (IOException | IgniteCheckedException e) { + if (e instanceof IgfsHadoopCommunicationException) + hadoop.close(true); + + if (log.isDebugEnabled()) + log.debug("Failed to connect to out-proc remote GGFS using TCP.", e); + + err = e; + } + } + + if (curDelegate != null) { + if (!delegateRef.compareAndSet(null, curDelegate)) + curDelegate.doomed = true; + + return curDelegate; + } + else + throw new IgfsHadoopCommunicationException("Failed to connect to GGFS: " + endpoint, err); + } + + /** + * File system operation closure. + */ + private static interface FileSystemClosure<T> { + /** + * Call closure body. + * + * @param hadoop RPC handler. + * @param hndResp Handshake response. + * @return Result. + * @throws IgniteCheckedException If failed. + * @throws IOException If failed. + */ + public T apply(IgfsHadoopEx hadoop, IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException; + } + + /** + * Delegate. + */ + private static class Delegate { + /** RPC handler. */ + private final IgfsHadoopEx hadoop; + + /** Handshake request. */ + private final IgfsHandshakeResponse hndResp; + + /** Close guard. */ + private final AtomicBoolean closeGuard = new AtomicBoolean(); + + /** Whether this delegate must be closed at the end of the next invocation. */ + private boolean doomed; + + /** + * Constructor. + * + * @param hadoop Hadoop. + * @param hndResp Handshake response. + */ + private Delegate(IgfsHadoopEx hadoop, IgfsHandshakeResponse hndResp) { + this.hadoop = hadoop; + this.hndResp = hndResp; + } + + /** + * Close underlying RPC handler. + * + * @param force Force flag. + */ + private void close(boolean force) { + if (closeGuard.compareAndSet(false, true)) + hadoop.close(force); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/package.html ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/package.html b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/package.html new file mode 100644 index 0000000..4520df8 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/package.html @@ -0,0 +1,24 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> +<html> +<body> + <!-- Package description. --> + Contains GGFS client classes. +</body> +</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/package.html ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/package.html b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/package.html new file mode 100644 index 0000000..4f47151 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/package.html @@ -0,0 +1,24 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> +<html> +<body> + <!-- Package description. --> + Contains GGFS client and common classes. +</body> +</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/GridHadoopDefaultMapReducePlanner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/GridHadoopDefaultMapReducePlanner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/GridHadoopDefaultMapReducePlanner.java index 44a8e9b..abe8aa1 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/GridHadoopDefaultMapReducePlanner.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/GridHadoopDefaultMapReducePlanner.java @@ -21,7 +21,7 @@ import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.igfs.*; import org.apache.ignite.internal.*; -import org.apache.ignite.internal.fs.hadoop.*; +import org.apache.ignite.internal.igfs.hadoop.*; import org.apache.ignite.internal.processors.fs.*; import org.apache.ignite.internal.processors.hadoop.*; import org.apache.ignite.internal.util.typedef.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java index c513ebd..8f140ff 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java @@ -20,7 +20,7 @@ package org.apache.ignite.igfs; import junit.framework.*; import org.apache.ignite.*; import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.fs.hadoop.*; +import org.apache.ignite.internal.igfs.hadoop.*; import org.apache.ignite.internal.processors.hadoop.*; import org.apache.ignite.internal.util.ipc.shmem.*; import org.apache.ignite.internal.util.typedef.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoop20FileSystemAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoop20FileSystemAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoop20FileSystemAbstractSelfTest.java index f4d758e..f1d1268 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoop20FileSystemAbstractSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoop20FileSystemAbstractSelfTest.java @@ -24,7 +24,7 @@ import org.apache.hadoop.fs.permission.*; import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.fs.hadoop.*; +import org.apache.ignite.internal.igfs.hadoop.*; import org.apache.ignite.internal.processors.fs.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.typedef.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopDualAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopDualAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopDualAbstractSelfTest.java index 3c6d000..6e55d59 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopDualAbstractSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopDualAbstractSelfTest.java @@ -23,7 +23,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.fs.hadoop.*; +import org.apache.ignite.internal.igfs.hadoop.*; import org.apache.ignite.internal.processors.fs.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemAbstractSelfTest.java index 577b910..b7c6b05 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemAbstractSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemAbstractSelfTest.java @@ -25,7 +25,7 @@ import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.configuration.*; import org.apache.ignite.igfs.hadoop.v1.IgfsHadoopFileSystem; -import org.apache.ignite.internal.fs.hadoop.*; +import org.apache.ignite.internal.igfs.hadoop.*; import org.apache.ignite.internal.processors.fs.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.lang.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemClientSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemClientSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemClientSelfTest.java index 0d9c740..1dc8ff7 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemClientSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemClientSelfTest.java @@ -20,8 +20,8 @@ package org.apache.ignite.igfs; import org.apache.commons.logging.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; -import org.apache.ignite.internal.fs.common.*; -import org.apache.ignite.internal.fs.hadoop.*; +import org.apache.ignite.internal.igfs.common.*; +import org.apache.ignite.internal.igfs.hadoop.*; import org.apache.ignite.internal.processors.fs.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemHandshakeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemHandshakeSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemHandshakeSelfTest.java index 56e117e..ca82d39 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemHandshakeSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemHandshakeSelfTest.java @@ -41,7 +41,7 @@ import static org.apache.ignite.cache.CacheDistributionMode.*; import static org.apache.ignite.cache.CacheMode.*; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; import static org.apache.ignite.igfs.IgfsMode.*; -import static org.apache.ignite.internal.fs.hadoop.IgfsHadoopUtils.*; +import static org.apache.ignite.internal.igfs.hadoop.IgfsHadoopUtils.*; import static org.apache.ignite.internal.util.ipc.shmem.IpcSharedMemoryServerEndpoint.*; /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemIpcCacheSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemIpcCacheSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemIpcCacheSelfTest.java index 6d2739b..84340f6 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemIpcCacheSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemIpcCacheSelfTest.java @@ -21,7 +21,7 @@ import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.*; import org.apache.ignite.cache.*; import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.fs.hadoop.*; +import org.apache.ignite.internal.igfs.hadoop.*; import org.apache.ignite.internal.processors.fs.*; import org.apache.ignite.internal.util.ipc.shmem.*; import org.apache.ignite.internal.util.typedef.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemLoggerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemLoggerSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemLoggerSelfTest.java index d7cef36..2075331 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemLoggerSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemLoggerSelfTest.java @@ -17,7 +17,7 @@ package org.apache.ignite.igfs; -import org.apache.ignite.internal.fs.common.*; +import org.apache.ignite.internal.igfs.common.*; import org.apache.ignite.internal.processors.fs.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -25,7 +25,7 @@ import java.io.*; import java.util.*; import static org.apache.ignite.igfs.IgfsMode.*; -import static org.apache.ignite.internal.fs.common.IgfsLogger.*; +import static org.apache.ignite.internal.igfs.common.IgfsLogger.*; /** * Grid GGFS client logger test. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemLoggerStateSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemLoggerStateSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemLoggerStateSelfTest.java index affedc0..a135fd7 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemLoggerStateSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemLoggerStateSelfTest.java @@ -23,7 +23,7 @@ import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.configuration.*; import org.apache.ignite.igfs.hadoop.v1.*; -import org.apache.ignite.internal.fs.common.*; +import org.apache.ignite.internal.igfs.common.*; import org.apache.ignite.internal.processors.fs.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemSecondaryModeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemSecondaryModeSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemSecondaryModeSelfTest.java index 728cf18..c8a388f 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemSecondaryModeSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemSecondaryModeSelfTest.java @@ -22,7 +22,7 @@ import org.apache.hadoop.fs.*; import org.apache.ignite.cache.*; import org.apache.ignite.configuration.*; import org.apache.ignite.igfs.hadoop.v1.*; -import org.apache.ignite.internal.fs.hadoop.*; +import org.apache.ignite.internal.igfs.hadoop.*; import org.apache.ignite.internal.processors.fs.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*;
