http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/IgfsHadoopWrapper.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/IgfsHadoopWrapper.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/IgfsHadoopWrapper.java deleted file mode 100644 index af0c47a..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/IgfsHadoopWrapper.java +++ /dev/null @@ -1,511 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.fs.hadoop; - -import org.apache.commons.logging.*; -import org.apache.hadoop.conf.*; -import org.apache.ignite.*; -import org.apache.ignite.igfs.*; -import org.apache.ignite.internal.processors.fs.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.internal.fs.hadoop.IgfsHadoopEndpoint.*; -import static org.apache.ignite.internal.fs.hadoop.IgfsHadoopUtils.*; - -/** - * Wrapper for GGFS server. - */ -public class IgfsHadoopWrapper implements IgfsHadoop { - /** Delegate. */ - private final AtomicReference<Delegate> delegateRef = new AtomicReference<>(); - - /** Authority. */ - private final String authority; - - /** Connection string. */ - private final IgfsHadoopEndpoint endpoint; - - /** Log directory. */ - private final String logDir; - - /** Configuration. */ - private final Configuration conf; - - /** Logger. */ - private final Log log; - - /** - * Constructor. - * - * @param authority Authority (connection string). - * @param logDir Log directory for server. - * @param conf Configuration. - * @param log Current logger. - */ - public IgfsHadoopWrapper(String authority, String logDir, Configuration conf, Log log) throws IOException { - try { - this.authority = authority; - this.endpoint = new IgfsHadoopEndpoint(authority); - this.logDir = logDir; - this.conf = conf; - this.log = log; - } - catch (IgniteCheckedException e) { - throw new IOException("Failed to parse endpoint: " + authority, e); - } - } - - /** {@inheritDoc} */ - @Override public IgfsHandshakeResponse handshake(String logDir) throws IOException { - return withReconnectHandling(new FileSystemClosure<IgfsHandshakeResponse>() { - @Override public IgfsHandshakeResponse apply(IgfsHadoopEx hadoop, - IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { - return hndResp; - } - }); - } - - /** {@inheritDoc} */ - @Override public void close(boolean force) { - Delegate delegate = delegateRef.get(); - - if (delegate != null && delegateRef.compareAndSet(delegate, null)) - delegate.close(force); - } - - /** {@inheritDoc} */ - @Override public IgfsFile info(final IgfsPath path) throws IOException { - return withReconnectHandling(new FileSystemClosure<IgfsFile>() { - @Override public IgfsFile apply(IgfsHadoopEx hadoop, IgfsHandshakeResponse hndResp) - throws IgniteCheckedException, IOException { - return hadoop.info(path); - } - }, path); - } - - /** {@inheritDoc} */ - @Override public IgfsFile update(final IgfsPath path, final Map<String, String> props) throws IOException { - return withReconnectHandling(new FileSystemClosure<IgfsFile>() { - @Override public IgfsFile apply(IgfsHadoopEx hadoop, IgfsHandshakeResponse hndResp) - throws IgniteCheckedException, IOException { - return hadoop.update(path, props); - } - }, path); - } - - /** {@inheritDoc} */ - @Override public Boolean setTimes(final IgfsPath path, final long accessTime, final long modificationTime) - throws IOException { - return withReconnectHandling(new FileSystemClosure<Boolean>() { - @Override public Boolean apply(IgfsHadoopEx hadoop, IgfsHandshakeResponse hndResp) - throws IgniteCheckedException, IOException { - return hadoop.setTimes(path, accessTime, modificationTime); - } - }, path); - } - - /** {@inheritDoc} */ - @Override public Boolean rename(final IgfsPath src, final IgfsPath dest) throws IOException { - return withReconnectHandling(new FileSystemClosure<Boolean>() { - @Override public Boolean apply(IgfsHadoopEx hadoop, IgfsHandshakeResponse hndResp) - throws IgniteCheckedException, IOException { - return hadoop.rename(src, dest); - } - }, src); - } - - /** {@inheritDoc} */ - @Override public Boolean delete(final IgfsPath path, final boolean recursive) throws IOException { - return withReconnectHandling(new FileSystemClosure<Boolean>() { - @Override public Boolean apply(IgfsHadoopEx hadoop, IgfsHandshakeResponse hndResp) - throws IgniteCheckedException, IOException { - return hadoop.delete(path, recursive); - } - }, path); - } - - /** {@inheritDoc} */ - @Override public Collection<IgfsBlockLocation> affinity(final IgfsPath path, final long start, - final long len) throws IOException { - return withReconnectHandling(new FileSystemClosure<Collection<IgfsBlockLocation>>() { - @Override public Collection<IgfsBlockLocation> apply(IgfsHadoopEx hadoop, - IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { - return hadoop.affinity(path, start, len); - } - }, path); - } - - /** {@inheritDoc} */ - @Override public IgfsPathSummary contentSummary(final IgfsPath path) throws IOException { - return withReconnectHandling(new FileSystemClosure<IgfsPathSummary>() { - @Override public IgfsPathSummary apply(IgfsHadoopEx hadoop, IgfsHandshakeResponse hndResp) - throws IgniteCheckedException, IOException { - return hadoop.contentSummary(path); - } - }, path); - } - - /** {@inheritDoc} */ - @Override public Boolean mkdirs(final IgfsPath path, final Map<String, String> props) throws IOException { - return withReconnectHandling(new FileSystemClosure<Boolean>() { - @Override public Boolean apply(IgfsHadoopEx hadoop, IgfsHandshakeResponse hndResp) - throws IgniteCheckedException, IOException { - return hadoop.mkdirs(path, props); - } - }, path); - } - - /** {@inheritDoc} */ - @Override public Collection<IgfsFile> listFiles(final IgfsPath path) throws IOException { - return withReconnectHandling(new FileSystemClosure<Collection<IgfsFile>>() { - @Override public Collection<IgfsFile> apply(IgfsHadoopEx hadoop, - IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { - return hadoop.listFiles(path); - } - }, path); - } - - /** {@inheritDoc} */ - @Override public Collection<IgfsPath> listPaths(final IgfsPath path) throws IOException { - return withReconnectHandling(new FileSystemClosure<Collection<IgfsPath>>() { - @Override public Collection<IgfsPath> apply(IgfsHadoopEx hadoop, - IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { - return hadoop.listPaths(path); - } - }, path); - } - - /** {@inheritDoc} */ - @Override public IgfsStatus fsStatus() throws IOException { - return withReconnectHandling(new FileSystemClosure<IgfsStatus>() { - @Override public IgfsStatus apply(IgfsHadoopEx hadoop, IgfsHandshakeResponse hndResp) - throws IgniteCheckedException, IOException { - return hadoop.fsStatus(); - } - }); - } - - /** {@inheritDoc} */ - @Override public IgfsHadoopStreamDelegate open(final IgfsPath path) throws IOException { - return withReconnectHandling(new FileSystemClosure<IgfsHadoopStreamDelegate>() { - @Override public IgfsHadoopStreamDelegate apply(IgfsHadoopEx hadoop, - IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { - return hadoop.open(path); - } - }, path); - } - - /** {@inheritDoc} */ - @Override public IgfsHadoopStreamDelegate open(final IgfsPath path, final int seqReadsBeforePrefetch) - throws IOException { - return withReconnectHandling(new FileSystemClosure<IgfsHadoopStreamDelegate>() { - @Override public IgfsHadoopStreamDelegate apply(IgfsHadoopEx hadoop, - IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { - return hadoop.open(path, seqReadsBeforePrefetch); - } - }, path); - } - - /** {@inheritDoc} */ - @Override public IgfsHadoopStreamDelegate create(final IgfsPath path, final boolean overwrite, - final boolean colocate, final int replication, final long blockSize, @Nullable final Map<String, String> props) - throws IOException { - return withReconnectHandling(new FileSystemClosure<IgfsHadoopStreamDelegate>() { - @Override public IgfsHadoopStreamDelegate apply(IgfsHadoopEx hadoop, - IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { - return hadoop.create(path, overwrite, colocate, replication, blockSize, props); - } - }, path); - } - - /** {@inheritDoc} */ - @Override public IgfsHadoopStreamDelegate append(final IgfsPath path, final boolean create, - @Nullable final Map<String, String> props) throws IOException { - return withReconnectHandling(new FileSystemClosure<IgfsHadoopStreamDelegate>() { - @Override public IgfsHadoopStreamDelegate apply(IgfsHadoopEx hadoop, - IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { - return hadoop.append(path, create, props); - } - }, path); - } - - /** - * Execute closure which is not path-specific. - * - * @param clo Closure. - * @return Result. - * @throws IOException If failed. - */ - private <T> T withReconnectHandling(FileSystemClosure<T> clo) throws IOException { - return withReconnectHandling(clo, null); - } - - /** - * Execute closure. - * - * @param clo Closure. - * @param path Path for exceptions. - * @return Result. - * @throws IOException If failed. - */ - private <T> T withReconnectHandling(final FileSystemClosure<T> clo, @Nullable IgfsPath path) - throws IOException { - Exception err = null; - - for (int i = 0; i < 2; i++) { - Delegate curDelegate = null; - - boolean close = false; - boolean force = false; - - try { - curDelegate = delegate(); - - assert curDelegate != null; - - close = curDelegate.doomed; - - return clo.apply(curDelegate.hadoop, curDelegate.hndResp); - } - catch (IgfsHadoopCommunicationException e) { - if (curDelegate != null && !curDelegate.doomed) { - // Try getting rid fo faulty delegate ASAP. - delegateRef.compareAndSet(curDelegate, null); - - close = true; - force = true; - } - - if (log.isDebugEnabled()) - log.debug("Failed to send message to a server: " + e); - - err = e; - } - catch (IgniteCheckedException e) { - throw IgfsHadoopUtils.cast(e, path != null ? path.toString() : null); - } - finally { - if (close) { - assert curDelegate != null; - - curDelegate.close(force); - } - } - } - - throw new IOException("Failed to communicate with GGFS.", err); - } - - /** - * Get delegate creating it if needed. - * - * @return Delegate. - */ - private Delegate delegate() throws IgfsHadoopCommunicationException { - Exception err = null; - - // 1. If delegate is set, return it immediately. - Delegate curDelegate = delegateRef.get(); - - if (curDelegate != null) - return curDelegate; - - // 2. Guess that we are in the same VM. - if (!parameter(conf, PARAM_GGFS_ENDPOINT_NO_EMBED, authority, false)) { - IgfsEx ggfs = null; - - if (endpoint.grid() == null) { - try { - Ignite ignite = G.ignite(); - - ggfs = (IgfsEx)ignite.fileSystem(endpoint.ggfs()); - } - catch (Exception e) { - err = e; - } - } - else { - for (Ignite ignite : G.allGrids()) { - try { - ggfs = (IgfsEx)ignite.fileSystem(endpoint.ggfs()); - - break; - } - catch (Exception e) { - err = e; - } - } - } - - if (ggfs != null) { - IgfsHadoopEx hadoop = null; - - try { - hadoop = new IgfsHadoopInProc(ggfs, log); - - curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); - } - catch (IOException | IgniteCheckedException e) { - if (e instanceof IgfsHadoopCommunicationException) - hadoop.close(true); - - if (log.isDebugEnabled()) - log.debug("Failed to connect to in-proc GGFS, fallback to IPC mode.", e); - - err = e; - } - } - } - - // 3. Try connecting using shmem. - if (!parameter(conf, PARAM_GGFS_ENDPOINT_NO_LOCAL_SHMEM, authority, false)) { - if (curDelegate == null && !U.isWindows()) { - IgfsHadoopEx hadoop = null; - - try { - hadoop = new IgfsHadoopOutProc(endpoint.port(), endpoint.grid(), endpoint.ggfs(), log); - - curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); - } - catch (IOException | IgniteCheckedException e) { - if (e instanceof IgfsHadoopCommunicationException) - hadoop.close(true); - - if (log.isDebugEnabled()) - log.debug("Failed to connect to out-proc local GGFS using shmem.", e); - - err = e; - } - } - } - - // 4. Try local TCP connection. - boolean skipLocTcp = parameter(conf, PARAM_GGFS_ENDPOINT_NO_LOCAL_TCP, authority, false); - - if (!skipLocTcp) { - if (curDelegate == null) { - IgfsHadoopEx hadoop = null; - - try { - hadoop = new IgfsHadoopOutProc(LOCALHOST, endpoint.port(), endpoint.grid(), endpoint.ggfs(), - log); - - curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); - } - catch (IOException | IgniteCheckedException e) { - if (e instanceof IgfsHadoopCommunicationException) - hadoop.close(true); - - if (log.isDebugEnabled()) - log.debug("Failed to connect to out-proc local GGFS using TCP.", e); - - err = e; - } - } - } - - // 5. Try remote TCP connection. - if (curDelegate == null && (skipLocTcp || !F.eq(LOCALHOST, endpoint.host()))) { - IgfsHadoopEx hadoop = null; - - try { - hadoop = new IgfsHadoopOutProc(endpoint.host(), endpoint.port(), endpoint.grid(), endpoint.ggfs(), log); - - curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); - } - catch (IOException | IgniteCheckedException e) { - if (e instanceof IgfsHadoopCommunicationException) - hadoop.close(true); - - if (log.isDebugEnabled()) - log.debug("Failed to connect to out-proc remote GGFS using TCP.", e); - - err = e; - } - } - - if (curDelegate != null) { - if (!delegateRef.compareAndSet(null, curDelegate)) - curDelegate.doomed = true; - - return curDelegate; - } - else - throw new IgfsHadoopCommunicationException("Failed to connect to GGFS: " + endpoint, err); - } - - /** - * File system operation closure. - */ - private static interface FileSystemClosure<T> { - /** - * Call closure body. - * - * @param hadoop RPC handler. - * @param hndResp Handshake response. - * @return Result. - * @throws IgniteCheckedException If failed. - * @throws IOException If failed. - */ - public T apply(IgfsHadoopEx hadoop, IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException; - } - - /** - * Delegate. - */ - private static class Delegate { - /** RPC handler. */ - private final IgfsHadoopEx hadoop; - - /** Handshake request. */ - private final IgfsHandshakeResponse hndResp; - - /** Close guard. */ - private final AtomicBoolean closeGuard = new AtomicBoolean(); - - /** Whether this delegate must be closed at the end of the next invocation. */ - private boolean doomed; - - /** - * Constructor. - * - * @param hadoop Hadoop. - * @param hndResp Handshake response. - */ - private Delegate(IgfsHadoopEx hadoop, IgfsHandshakeResponse hndResp) { - this.hadoop = hadoop; - this.hndResp = hndResp; - } - - /** - * Close underlying RPC handler. - * - * @param force Force flag. - */ - private void close(boolean force) { - if (closeGuard.compareAndSet(false, true)) - hadoop.close(force); - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/package.html ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/package.html b/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/package.html deleted file mode 100644 index 4520df8..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/package.html +++ /dev/null @@ -1,24 +0,0 @@ -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> - -<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> -<html> -<body> - <!-- Package description. --> - Contains GGFS client classes. -</body> -</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/package.html ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/package.html b/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/package.html deleted file mode 100644 index 4f47151..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/package.html +++ /dev/null @@ -1,24 +0,0 @@ -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> - -<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> -<html> -<body> - <!-- Package description. --> - Contains GGFS client and common classes. -</body> -</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoop.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoop.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoop.java new file mode 100644 index 0000000..325dd16 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoop.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.igfs.hadoop; + +import org.apache.ignite.*; +import org.apache.ignite.igfs.*; +import org.apache.ignite.internal.processors.fs.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; + +/** + * Facade for communication with grid. + */ +public interface IgfsHadoop { + /** + * Perform handshake. + * + * @param logDir Log directory. + * @return Future with handshake result. + * @throws IgniteCheckedException If failed. + */ + public IgfsHandshakeResponse handshake(String logDir) throws IgniteCheckedException, IOException; + + /** + * Close connection. + * + * @param force Force flag. + */ + public void close(boolean force); + + /** + * Command to retrieve file info for some GGFS path. + * + * @param path Path to get file info for. + * @return Future for info operation. + * @throws IgniteCheckedException If failed. + */ + public IgfsFile info(IgfsPath path) throws IgniteCheckedException, IOException; + + /** + * Command to update file properties. + * + * @param path GGFS path to update properties. + * @param props Properties to update. + * @return Future for update operation. + * @throws IgniteCheckedException If failed. + */ + public IgfsFile update(IgfsPath path, Map<String, String> props) throws IgniteCheckedException, IOException; + + /** + * Sets last access time and last modification time for a file. + * + * @param path Path to update times. + * @param accessTime Last access time to set. + * @param modificationTime Last modification time to set. + * @throws IgniteCheckedException If failed. + */ + public Boolean setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteCheckedException, + IOException; + + /** + * Command to rename given path. + * + * @param src Source path. + * @param dest Destination path. + * @return Future for rename operation. + * @throws IgniteCheckedException If failed. + */ + public Boolean rename(IgfsPath src, IgfsPath dest) throws IgniteCheckedException, IOException; + + /** + * Command to delete given path. + * + * @param path Path to delete. + * @param recursive {@code True} if deletion is recursive. + * @return Future for delete operation. + * @throws IgniteCheckedException If failed. + */ + public Boolean delete(IgfsPath path, boolean recursive) throws IgniteCheckedException, IOException; + + /** + * Command to get affinity for given path, offset and length. + * + * @param path Path to get affinity for. + * @param start Start position (offset). + * @param len Data length. + * @return Future for affinity command. + * @throws IgniteCheckedException If failed. + */ + public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len) throws IgniteCheckedException, + IOException; + + /** + * Gets path summary. + * + * @param path Path to get summary for. + * @return Future that will be completed when summary is received. + * @throws IgniteCheckedException If failed. + */ + public IgfsPathSummary contentSummary(IgfsPath path) throws IgniteCheckedException, IOException; + + /** + * Command to create directories. + * + * @param path Path to create. + * @return Future for mkdirs operation. + * @throws IgniteCheckedException If failed. + */ + public Boolean mkdirs(IgfsPath path, Map<String, String> props) throws IgniteCheckedException, IOException; + + /** + * Command to get list of files in directory. + * + * @param path Path to list. + * @return Future for listFiles operation. + * @throws IgniteCheckedException If failed. + */ + public Collection<IgfsFile> listFiles(IgfsPath path) throws IgniteCheckedException, IOException; + + /** + * Command to get directory listing. + * + * @param path Path to list. + * @return Future for listPaths operation. + * @throws IgniteCheckedException If failed. + */ + public Collection<IgfsPath> listPaths(IgfsPath path) throws IgniteCheckedException, IOException; + + /** + * Performs status request. + * + * @return Status response. + * @throws IgniteCheckedException If failed. + */ + public IgfsStatus fsStatus() throws IgniteCheckedException, IOException; + + /** + * Command to open file for reading. + * + * @param path File path to open. + * @return Future for open operation. + * @throws IgniteCheckedException If failed. + */ + public IgfsHadoopStreamDelegate open(IgfsPath path) throws IgniteCheckedException, IOException; + + /** + * Command to open file for reading. + * + * @param path File path to open. + * @return Future for open operation. + * @throws IgniteCheckedException If failed. + */ + public IgfsHadoopStreamDelegate open(IgfsPath path, int seqReadsBeforePrefetch) throws IgniteCheckedException, + IOException; + + /** + * Command to create file and open it for output. + * + * @param path Path to file. + * @param overwrite If {@code true} then old file contents will be lost. + * @param colocate If {@code true} and called on data node, file will be written on that node. + * @param replication Replication factor. + * @param props File properties for creation. + * @return Stream descriptor. + * @throws IgniteCheckedException If failed. + */ + public IgfsHadoopStreamDelegate create(IgfsPath path, boolean overwrite, boolean colocate, + int replication, long blockSize, @Nullable Map<String, String> props) throws IgniteCheckedException, IOException; + + /** + * Open file for output appending data to the end of a file. + * + * @param path Path to file. + * @param create If {@code true}, file will be created if does not exist. + * @param props File properties. + * @return Stream descriptor. + * @throws IgniteCheckedException If failed. + */ + public IgfsHadoopStreamDelegate append(IgfsPath path, boolean create, + @Nullable Map<String, String> props) throws IgniteCheckedException, IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopCommunicationException.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopCommunicationException.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopCommunicationException.java new file mode 100644 index 0000000..f892fb1 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopCommunicationException.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.igfs.hadoop; + +import org.apache.ignite.*; + +/** + * Communication exception indicating a problem between file system and GGFS instance. + */ +public class IgfsHadoopCommunicationException extends IgniteCheckedException { + /** */ + private static final long serialVersionUID = 0L; + + /** + * Creates new exception with given throwable as a nested cause and + * source of error message. + * + * @param cause Non-null throwable cause. + */ + public IgfsHadoopCommunicationException(Exception cause) { + super(cause); + } + + /** + * Creates a new exception with given error message and optional nested cause exception. + * + * @param msg Error message. + */ + public IgfsHadoopCommunicationException(String msg) { + super(msg); + } + + /** + * Creates a new exception with given error message and optional nested cause exception. + * + * @param msg Error message. + * @param cause Cause. + */ + public IgfsHadoopCommunicationException(String msg, Exception cause) { + super(msg, cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopEndpoint.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopEndpoint.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopEndpoint.java new file mode 100644 index 0000000..42feaae --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopEndpoint.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.igfs.hadoop; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.net.*; + +import static org.apache.ignite.configuration.IgfsConfiguration.*; + +/** + * GGFS endpoint abstraction. + */ +public class IgfsHadoopEndpoint { + /** Localhost. */ + public static final String LOCALHOST = "127.0.0.1"; + + /** GGFS name. */ + private final String ggfsName; + + /** Grid name. */ + private final String gridName; + + /** Host. */ + private final String host; + + /** Port. */ + private final int port; + + /** + * Normalize GGFS URI. + * + * @param uri URI. + * @return Normalized URI. + * @throws IOException If failed. + */ + public static URI normalize(URI uri) throws IOException { + try { + if (!F.eq(IgniteFs.GGFS_SCHEME, uri.getScheme())) + throw new IOException("Failed to normalize UIR because it has non GGFS scheme: " + uri); + + IgfsHadoopEndpoint endpoint = new IgfsHadoopEndpoint(uri.getAuthority()); + + StringBuilder sb = new StringBuilder(); + + if (endpoint.ggfs() != null) + sb.append(endpoint.ggfs()); + + if (endpoint.grid() != null) + sb.append(":").append(endpoint.grid()); + + return new URI(uri.getScheme(), sb.length() != 0 ? sb.toString() : null, endpoint.host(), endpoint.port(), + uri.getPath(), uri.getQuery(), uri.getFragment()); + } + catch (URISyntaxException | IgniteCheckedException e) { + throw new IOException("Failed to normalize URI: " + uri, e); + } + } + + /** + * Constructor. + * + * @param connStr Connection string. + * @throws IgniteCheckedException If failed to parse connection string. + */ + public IgfsHadoopEndpoint(@Nullable String connStr) throws IgniteCheckedException { + if (connStr == null) + connStr = ""; + + String[] tokens = connStr.split("@", -1); + + IgniteBiTuple<String, Integer> hostPort; + + if (tokens.length == 1) { + ggfsName = null; + gridName = null; + + hostPort = hostPort(connStr, connStr); + } + else if (tokens.length == 2) { + String authStr = tokens[0]; + + if (authStr.isEmpty()) { + gridName = null; + ggfsName = null; + } + else { + String[] authTokens = authStr.split(":", -1); + + ggfsName = F.isEmpty(authTokens[0]) ? null : authTokens[0]; + + if (authTokens.length == 1) + gridName = null; + else if (authTokens.length == 2) + gridName = F.isEmpty(authTokens[1]) ? null : authTokens[1]; + else + throw new IgniteCheckedException("Invalid connection string format: " + connStr); + } + + hostPort = hostPort(connStr, tokens[1]); + } + else + throw new IgniteCheckedException("Invalid connection string format: " + connStr); + + host = hostPort.get1(); + + assert hostPort.get2() != null; + + port = hostPort.get2(); + } + + /** + * Parse host and port. + * + * @param connStr Full connection string. + * @param hostPortStr Host/port connection string part. + * @return Tuple with host and port. + * @throws IgniteCheckedException If failed to parse connection string. + */ + private IgniteBiTuple<String, Integer> hostPort(String connStr, String hostPortStr) throws IgniteCheckedException { + String[] tokens = hostPortStr.split(":", -1); + + String host = tokens[0]; + + if (F.isEmpty(host)) + host = LOCALHOST; + + int port; + + if (tokens.length == 1) + port = DFLT_IPC_PORT; + else if (tokens.length == 2) { + String portStr = tokens[1]; + + try { + port = Integer.valueOf(portStr); + + if (port < 0 || port > 65535) + throw new IgniteCheckedException("Invalid port number: " + connStr); + } + catch (NumberFormatException e) { + throw new IgniteCheckedException("Invalid port number: " + connStr); + } + } + else + throw new IgniteCheckedException("Invalid connection string format: " + connStr); + + return F.t(host, port); + } + + /** + * @return GGFS name. + */ + @Nullable public String ggfs() { + return ggfsName; + } + + /** + * @return Grid name. + */ + @Nullable public String grid() { + return gridName; + } + + /** + * @return Host. + */ + public String host() { + return host; + } + + /** + * @return Host. + */ + public boolean isLocal() { + return F.eq(LOCALHOST, host); + } + + /** + * @return Port. + */ + public int port() { + return port; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgfsHadoopEndpoint.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopEx.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopEx.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopEx.java new file mode 100644 index 0000000..aa9de45 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopEx.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.igfs.hadoop; + +import org.apache.ignite.internal.util.lang.*; +import org.jetbrains.annotations.*; + +import java.io.*; + +/** + * Extended GGFS server interface. + */ +public interface IgfsHadoopEx extends IgfsHadoop { + /** + * Adds event listener that will be invoked when connection with server is lost or remote error has occurred. + * If connection is closed already, callback will be invoked synchronously inside this method. + * + * @param delegate Stream delegate. + * @param lsnr Event listener. + */ + public void addEventListener(IgfsHadoopStreamDelegate delegate, IgfsHadoopStreamEventListener lsnr); + + /** + * Removes event listener that will be invoked when connection with server is lost or remote error has occurred. + * + * @param delegate Stream delegate. + */ + public void removeEventListener(IgfsHadoopStreamDelegate delegate); + + /** + * Asynchronously reads specified amount of bytes from opened input stream. + * + * @param delegate Stream delegate. + * @param pos Position to read from. + * @param len Data length to read. + * @param outBuf Optional output buffer. If buffer length is less then {@code len}, all remaining + * bytes will be read into new allocated buffer of length {len - outBuf.length} and this buffer will + * be the result of read future. + * @param outOff Output offset. + * @param outLen Output length. + * @return Read data. + */ + public GridPlainFuture<byte[]> readData(IgfsHadoopStreamDelegate delegate, long pos, int len, + @Nullable final byte[] outBuf, final int outOff, final int outLen); + + /** + * Writes data to the stream with given streamId. This method does not return any future since + * no response to write request is sent. + * + * @param delegate Stream delegate. + * @param data Data to write. + * @param off Offset. + * @param len Length. + * @throws IOException If failed. + */ + public void writeData(IgfsHadoopStreamDelegate delegate, byte[] data, int off, int len) throws IOException; + + /** + * Close server stream. + * + * @param delegate Stream delegate. + * @throws IOException If failed. + */ + public void closeStream(IgfsHadoopStreamDelegate delegate) throws IOException; + + /** + * Flush output stream. + * + * @param delegate Stream delegate. + * @throws IOException If failed. + */ + public void flush(IgfsHadoopStreamDelegate delegate) throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFSProperties.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFSProperties.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFSProperties.java new file mode 100644 index 0000000..e0ea1b6 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFSProperties.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.igfs.hadoop; + +import org.apache.hadoop.fs.permission.*; +import org.apache.ignite.*; + +import java.util.*; + +import static org.apache.ignite.IgniteFs.*; + +/** + * Hadoop file system properties. + */ +class IgfsHadoopFSProperties { + /** Username. */ + private String usrName; + + /** Group name. */ + private String grpName; + + /** Permissions. */ + private FsPermission perm; + + /** + * Constructor. + * + * @param props Properties. + * @throws IgniteException In case of error. + */ + IgfsHadoopFSProperties(Map<String, String> props) throws IgniteException { + usrName = props.get(PROP_USER_NAME); + grpName = props.get(PROP_GROUP_NAME); + + String permStr = props.get(PROP_PERMISSION); + + if (permStr != null) { + try { + perm = new FsPermission((short)Integer.parseInt(permStr, 8)); + } + catch (NumberFormatException ignore) { + throw new IgniteException("Permissions cannot be parsed: " + permStr); + } + } + } + + /** + * Get user name. + * + * @return User name. + */ + String userName() { + return usrName; + } + + /** + * Get group name. + * + * @return Group name. + */ + String groupName() { + return grpName; + } + + /** + * Get permission. + * + * @return Permission. + */ + FsPermission permission() { + return perm; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFileSystemWrapper.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFileSystemWrapper.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFileSystemWrapper.java new file mode 100644 index 0000000..fbf27f0 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFileSystemWrapper.java @@ -0,0 +1,413 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.igfs.hadoop; + +import org.apache.hadoop.conf.*; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.permission.*; +import org.apache.hadoop.ipc.*; +import org.apache.ignite.*; +import org.apache.ignite.igfs.*; +import org.apache.ignite.internal.processors.fs.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.net.*; +import java.util.*; + +/** + * Adapter to use any Hadoop file system {@link org.apache.hadoop.fs.FileSystem} as {@link org.apache.ignite.igfs.Igfs}. + */ +public class IgfsHadoopFileSystemWrapper implements Igfs, AutoCloseable { + /** Property name for path to Hadoop configuration. */ + public static final String SECONDARY_FS_CONFIG_PATH = "SECONDARY_FS_CONFIG_PATH"; + + /** Property name for URI of file system. */ + public static final String SECONDARY_FS_URI = "SECONDARY_FS_URI"; + + /** Hadoop file system. */ + private final FileSystem fileSys; + + /** Properties of file system */ + private final Map<String, String> props = new HashMap<>(); + + /** + * Constructor. + * + * @param uri URI of file system. + * @param cfgPath Additional path to Hadoop configuration. + * @throws IgniteCheckedException In case of error. + */ + public IgfsHadoopFileSystemWrapper(@Nullable String uri, @Nullable String cfgPath) throws IgniteCheckedException { + Configuration cfg = new Configuration(); + + if (cfgPath != null) + cfg.addResource(U.resolveIgniteUrl(cfgPath)); + + try { + fileSys = uri == null ? FileSystem.get(cfg) : FileSystem.get(new URI(uri), cfg); + } + catch (IOException | URISyntaxException e) { + throw new IgniteCheckedException(e); + } + + uri = fileSys.getUri().toString(); + + if (!uri.endsWith("/")) + uri += "/"; + + props.put(SECONDARY_FS_CONFIG_PATH, cfgPath); + props.put(SECONDARY_FS_URI, uri); + } + + /** + * Convert GGFS path into Hadoop path. + * + * @param path GGFS path. + * @return Hadoop path. + */ + private Path convert(IgfsPath path) { + URI uri = fileSys.getUri(); + + return new Path(uri.getScheme(), uri.getAuthority(), path.toString()); + } + + /** + * Heuristically checks if exception was caused by invalid HDFS version and returns appropriate exception. + * + * @param e Exception to check. + * @param detailMsg Detailed error message. + * @return Appropriate exception. + */ + private IgfsException handleSecondaryFsError(IOException e, String detailMsg) { + boolean wrongVer = X.hasCause(e, RemoteException.class) || + (e.getMessage() != null && e.getMessage().contains("Failed on local")); + + IgfsException ggfsErr = !wrongVer ? cast(detailMsg, e) : + new IgfsInvalidHdfsVersionException("HDFS version you are connecting to differs from local " + + "version.", e); + + + + return ggfsErr; + } + + /** + * Cast IO exception to GGFS exception. + * + * @param e IO exception. + * @return GGFS exception. + */ + public static IgfsException cast(String msg, IOException e) { + if (e instanceof FileNotFoundException) + return new IgfsFileNotFoundException(e); + else if (e instanceof ParentNotDirectoryException) + return new IgfsParentNotDirectoryException(msg, e); + else if (e instanceof PathIsNotEmptyDirectoryException) + return new IgfsDirectoryNotEmptyException(e); + else if (e instanceof PathExistsException) + return new IgfsPathAlreadyExistsException(msg, e); + else + return new IgfsException(msg, e); + } + + /** + * Convert Hadoop FileStatus properties to map. + * + * @param status File status. + * @return GGFS attributes. + */ + private static Map<String, String> properties(FileStatus status) { + FsPermission perm = status.getPermission(); + + if (perm == null) + perm = FsPermission.getDefault(); + + return F.asMap(PROP_PERMISSION, String.format("%04o", perm.toShort()), PROP_USER_NAME, status.getOwner(), + PROP_GROUP_NAME, status.getGroup()); + } + + /** {@inheritDoc} */ + @Override public boolean exists(IgfsPath path) { + try { + return fileSys.exists(convert(path)); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to check file existence [path=" + path + "]"); + } + } + + /** {@inheritDoc} */ + @Nullable @Override public IgfsFile update(IgfsPath path, Map<String, String> props) { + IgfsHadoopFSProperties props0 = new IgfsHadoopFSProperties(props); + + try { + if (props0.userName() != null || props0.groupName() != null) + fileSys.setOwner(convert(path), props0.userName(), props0.groupName()); + + if (props0.permission() != null) + fileSys.setPermission(convert(path), props0.permission()); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to update file properties [path=" + path + "]"); + } + + //Result is not used in case of secondary FS. + return null; + } + + /** {@inheritDoc} */ + @Override public void rename(IgfsPath src, IgfsPath dest) { + // Delegate to the secondary file system. + try { + if (!fileSys.rename(convert(src), convert(dest))) + throw new IgfsException("Failed to rename (secondary file system returned false) " + + "[src=" + src + ", dest=" + dest + ']'); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to rename file [src=" + src + ", dest=" + dest + ']'); + } + } + + /** {@inheritDoc} */ + @Override public boolean delete(IgfsPath path, boolean recursive) { + try { + return fileSys.delete(convert(path), recursive); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to delete file [path=" + path + ", recursive=" + recursive + "]"); + } + } + + /** {@inheritDoc} */ + @Override public void mkdirs(IgfsPath path) { + try { + if (!fileSys.mkdirs(convert(path))) + throw new IgniteException("Failed to make directories [path=" + path + "]"); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to make directories [path=" + path + "]"); + } + } + + /** {@inheritDoc} */ + @Override public void mkdirs(IgfsPath path, @Nullable Map<String, String> props) { + try { + if (!fileSys.mkdirs(convert(path), new IgfsHadoopFSProperties(props).permission())) + throw new IgniteException("Failed to make directories [path=" + path + ", props=" + props + "]"); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to make directories [path=" + path + ", props=" + props + "]"); + } + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsPath> listPaths(IgfsPath path) { + try { + FileStatus[] statuses = fileSys.listStatus(convert(path)); + + if (statuses == null) + throw new IgfsFileNotFoundException("Failed to list files (path not found): " + path); + + Collection<IgfsPath> res = new ArrayList<>(statuses.length); + + for (FileStatus status : statuses) + res.add(new IgfsPath(path, status.getPath().getName())); + + return res; + } + catch (FileNotFoundException ignored) { + throw new IgfsFileNotFoundException("Failed to list files (path not found): " + path); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to list statuses due to secondary file system exception: " + path); + } + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsFile> listFiles(IgfsPath path) { + try { + FileStatus[] statuses = fileSys.listStatus(convert(path)); + + if (statuses == null) + throw new IgfsFileNotFoundException("Failed to list files (path not found): " + path); + + Collection<IgfsFile> res = new ArrayList<>(statuses.length); + + for (FileStatus status : statuses) { + IgfsFileInfo fsInfo = status.isDirectory() ? new IgfsFileInfo(true, properties(status)) : + new IgfsFileInfo((int)status.getBlockSize(), status.getLen(), null, null, false, + properties(status)); + + res.add(new IgfsFileImpl(new IgfsPath(path, status.getPath().getName()), fsInfo, 1)); + } + + return res; + } + catch (FileNotFoundException ignored) { + throw new IgfsFileNotFoundException("Failed to list files (path not found): " + path); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to list statuses due to secondary file system exception: " + path); + } + } + + /** {@inheritDoc} */ + @Override public IgfsReader open(IgfsPath path, int bufSize) { + return new IgfsHadoopReader(fileSys, convert(path), bufSize); + } + + /** {@inheritDoc} */ + @Override public OutputStream create(IgfsPath path, boolean overwrite) { + try { + return fileSys.create(convert(path), overwrite); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", overwrite=" + overwrite + "]"); + } + } + + /** {@inheritDoc} */ + @Override public OutputStream create(IgfsPath path, int bufSize, boolean overwrite, int replication, + long blockSize, @Nullable Map<String, String> props) { + IgfsHadoopFSProperties props0 = + new IgfsHadoopFSProperties(props != null ? props : Collections.<String, String>emptyMap()); + + try { + return fileSys.create(convert(path), props0.permission(), overwrite, bufSize, (short)replication, blockSize, + null); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", props=" + props + + ", overwrite=" + overwrite + ", bufSize=" + bufSize + ", replication=" + replication + + ", blockSize=" + blockSize + "]"); + } + } + + /** {@inheritDoc} */ + @Override public OutputStream append(IgfsPath path, int bufSize, boolean create, + @Nullable Map<String, String> props) { + try { + return fileSys.append(convert(path), bufSize); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to append file [path=" + path + ", bufSize=" + bufSize + "]"); + } + } + + /** {@inheritDoc} */ + @Override public IgfsFile info(final IgfsPath path) { + try { + final FileStatus status = fileSys.getFileStatus(convert(path)); + + if (status == null) + return null; + + final Map<String, String> props = properties(status); + + return new IgfsFile() { + @Override public IgfsPath path() { + return path; + } + + @Override public boolean isFile() { + return status.isFile(); + } + + @Override public boolean isDirectory() { + return status.isDirectory(); + } + + @Override public int blockSize() { + return (int)status.getBlockSize(); + } + + @Override public long groupBlockSize() { + return status.getBlockSize(); + } + + @Override public long accessTime() { + return status.getAccessTime(); + } + + @Override public long modificationTime() { + return status.getModificationTime(); + } + + @Override public String property(String name) throws IllegalArgumentException { + String val = props.get(name); + + if (val == null) + throw new IllegalArgumentException("File property not found [path=" + path + ", name=" + name + ']'); + + return val; + } + + @Nullable @Override public String property(String name, @Nullable String dfltVal) { + String val = props.get(name); + + return val == null ? dfltVal : val; + } + + @Override public long length() { + return status.getLen(); + } + + /** {@inheritDoc} */ + @Override public Map<String, String> properties() { + return props; + } + }; + + } + catch (FileNotFoundException ignore) { + return null; + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to get file status [path=" + path + "]"); + } + } + + /** {@inheritDoc} */ + @Override public long usedSpaceSize() { + try { + return fileSys.getContentSummary(new Path(fileSys.getUri())).getSpaceConsumed(); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to get used space size of file system."); + } + } + + /** {@inheritDoc} */ + @Nullable @Override public Map<String, String> properties() { + return props; + } + + /** {@inheritDoc} */ + @Override public void close() throws IgniteCheckedException { + try { + fileSys.close(); + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFuture.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFuture.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFuture.java new file mode 100644 index 0000000..3d4bda8 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFuture.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.igfs.hadoop; + +import org.apache.ignite.internal.util.lang.*; +import org.jetbrains.annotations.*; + +/** + * GGFS client future that holds response parse closure. + */ +public class IgfsHadoopFuture<T> extends GridPlainFutureAdapter<T> { + /** Output buffer. */ + private byte[] outBuf; + + /** Output offset. */ + private int outOff; + + /** Output length. */ + private int outLen; + + /** Read future flag. */ + private boolean read; + + /** + * @return Output buffer. + */ + public byte[] outputBuffer() { + return outBuf; + } + + /** + * @param outBuf Output buffer. + */ + public void outputBuffer(@Nullable byte[] outBuf) { + this.outBuf = outBuf; + } + + /** + * @return Offset in output buffer to write from. + */ + public int outputOffset() { + return outOff; + } + + /** + * @param outOff Offset in output buffer to write from. + */ + public void outputOffset(int outOff) { + this.outOff = outOff; + } + + /** + * @return Length to write to output buffer. + */ + public int outputLength() { + return outLen; + } + + /** + * @param outLen Length to write to output buffer. + */ + public void outputLength(int outLen) { + this.outLen = outLen; + } + + /** + * @param read {@code True} if this is a read future. + */ + public void read(boolean read) { + this.read = read; + } + + /** + * @return {@code True} if this is a read future. + */ + public boolean read() { + return read; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopInProc.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopInProc.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopInProc.java new file mode 100644 index 0000000..09cb964 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopInProc.java @@ -0,0 +1,409 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.igfs.hadoop; + +import org.apache.commons.logging.*; +import org.apache.ignite.*; +import org.apache.ignite.igfs.*; +import org.apache.ignite.internal.processors.fs.*; +import org.apache.ignite.internal.util.lang.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; + +/** + * Communication with grid in the same process. + */ +public class IgfsHadoopInProc implements IgfsHadoopEx { + /** Target GGFS. */ + private final IgfsEx ggfs; + + /** Buffer size. */ + private final int bufSize; + + /** Event listeners. */ + private final Map<IgfsHadoopStreamDelegate, IgfsHadoopStreamEventListener> lsnrs = + new ConcurrentHashMap<>(); + + /** Logger. */ + private final Log log; + + /** + * Constructor. + * + * @param ggfs Target GGFS. + * @param log Log. + */ + public IgfsHadoopInProc(IgfsEx ggfs, Log log) { + this.ggfs = ggfs; + this.log = log; + + bufSize = ggfs.configuration().getBlockSize() * 2; + } + + /** {@inheritDoc} */ + @Override public IgfsHandshakeResponse handshake(String logDir) { + ggfs.clientLogDirectory(logDir); + + return new IgfsHandshakeResponse(ggfs.name(), ggfs.proxyPaths(), ggfs.groupBlockSize(), + ggfs.globalSampling()); + } + + /** {@inheritDoc} */ + @Override public void close(boolean force) { + // Perform cleanup. + for (IgfsHadoopStreamEventListener lsnr : lsnrs.values()) { + try { + lsnr.onClose(); + } + catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to notify stream event listener", e); + } + } + } + + /** {@inheritDoc} */ + @Override public IgfsFile info(IgfsPath path) throws IgniteCheckedException { + try { + return ggfs.info(path); + } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } + catch (IllegalStateException e) { + throw new IgfsHadoopCommunicationException("Failed to get file info because Grid is stopping: " + path); + } + } + + /** {@inheritDoc} */ + @Override public IgfsFile update(IgfsPath path, Map<String, String> props) throws IgniteCheckedException { + try { + return ggfs.update(path, props); + } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } + catch (IllegalStateException e) { + throw new IgfsHadoopCommunicationException("Failed to update file because Grid is stopping: " + path); + } + } + + /** {@inheritDoc} */ + @Override public Boolean setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteCheckedException { + try { + ggfs.setTimes(path, accessTime, modificationTime); + + return true; + } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } + catch (IllegalStateException e) { + throw new IgfsHadoopCommunicationException("Failed to set path times because Grid is stopping: " + + path); + } + } + + /** {@inheritDoc} */ + @Override public Boolean rename(IgfsPath src, IgfsPath dest) throws IgniteCheckedException { + try { + ggfs.rename(src, dest); + + return true; + } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } + catch (IllegalStateException e) { + throw new IgfsHadoopCommunicationException("Failed to rename path because Grid is stopping: " + src); + } + } + + /** {@inheritDoc} */ + @Override public Boolean delete(IgfsPath path, boolean recursive) throws IgniteCheckedException { + try { + return ggfs.delete(path, recursive); + } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } + catch (IllegalStateException e) { + throw new IgfsHadoopCommunicationException("Failed to delete path because Grid is stopping: " + path); + } + } + + /** {@inheritDoc} */ + @Override public IgfsStatus fsStatus() throws IgniteCheckedException { + try { + return ggfs.globalSpace(); + } + catch (IllegalStateException e) { + throw new IgfsHadoopCommunicationException("Failed to get file system status because Grid is " + + "stopping."); + } + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsPath> listPaths(IgfsPath path) throws IgniteCheckedException { + try { + return ggfs.listPaths(path); + } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } + catch (IllegalStateException e) { + throw new IgfsHadoopCommunicationException("Failed to list paths because Grid is stopping: " + path); + } + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsFile> listFiles(IgfsPath path) throws IgniteCheckedException { + try { + return ggfs.listFiles(path); + } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } + catch (IllegalStateException e) { + throw new IgfsHadoopCommunicationException("Failed to list files because Grid is stopping: " + path); + } + } + + /** {@inheritDoc} */ + @Override public Boolean mkdirs(IgfsPath path, Map<String, String> props) throws IgniteCheckedException { + try { + ggfs.mkdirs(path, props); + + return true; + } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } + catch (IllegalStateException e) { + throw new IgfsHadoopCommunicationException("Failed to create directory because Grid is stopping: " + + path); + } + } + + /** {@inheritDoc} */ + @Override public IgfsPathSummary contentSummary(IgfsPath path) throws IgniteCheckedException { + try { + return ggfs.summary(path); + } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } + catch (IllegalStateException e) { + throw new IgfsHadoopCommunicationException("Failed to get content summary because Grid is stopping: " + + path); + } + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len) + throws IgniteCheckedException { + try { + return ggfs.affinity(path, start, len); + } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } + catch (IllegalStateException e) { + throw new IgfsHadoopCommunicationException("Failed to get affinity because Grid is stopping: " + path); + } + } + + /** {@inheritDoc} */ + @Override public IgfsHadoopStreamDelegate open(IgfsPath path) throws IgniteCheckedException { + try { + IgfsInputStreamAdapter stream = ggfs.open(path, bufSize); + + return new IgfsHadoopStreamDelegate(this, stream, stream.fileInfo().length()); + } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } + catch (IllegalStateException e) { + throw new IgfsHadoopCommunicationException("Failed to open file because Grid is stopping: " + path); + } + } + + /** {@inheritDoc} */ + @Override public IgfsHadoopStreamDelegate open(IgfsPath path, int seqReadsBeforePrefetch) + throws IgniteCheckedException { + try { + IgfsInputStreamAdapter stream = ggfs.open(path, bufSize, seqReadsBeforePrefetch); + + return new IgfsHadoopStreamDelegate(this, stream, stream.fileInfo().length()); + } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } + catch (IllegalStateException e) { + throw new IgfsHadoopCommunicationException("Failed to open file because Grid is stopping: " + path); + } + } + + /** {@inheritDoc} */ + @Override public IgfsHadoopStreamDelegate create(IgfsPath path, boolean overwrite, boolean colocate, + int replication, long blockSize, @Nullable Map<String, String> props) throws IgniteCheckedException { + try { + IgfsOutputStream stream = ggfs.create(path, bufSize, overwrite, + colocate ? ggfs.nextAffinityKey() : null, replication, blockSize, props); + + return new IgfsHadoopStreamDelegate(this, stream); + } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } + catch (IllegalStateException e) { + throw new IgfsHadoopCommunicationException("Failed to create file because Grid is stopping: " + path); + } + } + + /** {@inheritDoc} */ + @Override public IgfsHadoopStreamDelegate append(IgfsPath path, boolean create, + @Nullable Map<String, String> props) throws IgniteCheckedException { + try { + IgfsOutputStream stream = ggfs.append(path, bufSize, create, props); + + return new IgfsHadoopStreamDelegate(this, stream); + } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } + catch (IllegalStateException e) { + throw new IgfsHadoopCommunicationException("Failed to append file because Grid is stopping: " + path); + } + } + + /** {@inheritDoc} */ + @Override public GridPlainFuture<byte[]> readData(IgfsHadoopStreamDelegate delegate, long pos, int len, + @Nullable byte[] outBuf, int outOff, int outLen) { + IgfsInputStreamAdapter stream = delegate.target(); + + try { + byte[] res = null; + + if (outBuf != null) { + int outTailLen = outBuf.length - outOff; + + if (len <= outTailLen) + stream.readFully(pos, outBuf, outOff, len); + else { + stream.readFully(pos, outBuf, outOff, outTailLen); + + int remainderLen = len - outTailLen; + + res = new byte[remainderLen]; + + stream.readFully(pos, res, 0, remainderLen); + } + } else { + res = new byte[len]; + + stream.readFully(pos, res, 0, len); + } + + return new GridPlainFutureAdapter<>(res); + } + catch (IllegalStateException | IOException e) { + IgfsHadoopStreamEventListener lsnr = lsnrs.get(delegate); + + if (lsnr != null) + lsnr.onError(e.getMessage()); + + return new GridPlainFutureAdapter<>(e); + } + } + + /** {@inheritDoc} */ + @Override public void writeData(IgfsHadoopStreamDelegate delegate, byte[] data, int off, int len) + throws IOException { + try { + IgfsOutputStream stream = delegate.target(); + + stream.write(data, off, len); + } + catch (IllegalStateException | IOException e) { + IgfsHadoopStreamEventListener lsnr = lsnrs.get(delegate); + + if (lsnr != null) + lsnr.onError(e.getMessage()); + + if (e instanceof IllegalStateException) + throw new IOException("Failed to write data to GGFS stream because Grid is stopping.", e); + else + throw e; + } + } + + /** {@inheritDoc} */ + @Override public void flush(IgfsHadoopStreamDelegate delegate) throws IOException { + try { + IgfsOutputStream stream = delegate.target(); + + stream.flush(); + } + catch (IllegalStateException | IOException e) { + IgfsHadoopStreamEventListener lsnr = lsnrs.get(delegate); + + if (lsnr != null) + lsnr.onError(e.getMessage()); + + if (e instanceof IllegalStateException) + throw new IOException("Failed to flush data to GGFS stream because Grid is stopping.", e); + else + throw e; + } + } + + /** {@inheritDoc} */ + @Override public void closeStream(IgfsHadoopStreamDelegate desc) throws IOException { + Closeable closeable = desc.target(); + + try { + closeable.close(); + } + catch (IllegalStateException e) { + throw new IOException("Failed to close GGFS stream because Grid is stopping.", e); + } + } + + /** {@inheritDoc} */ + @Override public void addEventListener(IgfsHadoopStreamDelegate delegate, + IgfsHadoopStreamEventListener lsnr) { + IgfsHadoopStreamEventListener lsnr0 = lsnrs.put(delegate, lsnr); + + assert lsnr0 == null || lsnr0 == lsnr; + + if (log.isDebugEnabled()) + log.debug("Added stream event listener [delegate=" + delegate + ']'); + } + + /** {@inheritDoc} */ + @Override public void removeEventListener(IgfsHadoopStreamDelegate delegate) { + IgfsHadoopStreamEventListener lsnr0 = lsnrs.remove(delegate); + + if (lsnr0 != null && log.isDebugEnabled()) + log.debug("Removed stream event listener [delegate=" + delegate + ']'); + } +}
