http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/IgfsHadoop.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/IgfsHadoop.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/IgfsHadoop.java deleted file mode 100644 index d37ccf8..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/IgfsHadoop.java +++ /dev/null @@ -1,198 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.fs.hadoop; - -import org.apache.ignite.*; -import org.apache.ignite.igfs.*; -import org.apache.ignite.internal.processors.fs.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; - -/** - * Facade for communication with grid. - */ -public interface IgfsHadoop { - /** - * Perform handshake. - * - * @param logDir Log directory. - * @return Future with handshake result. - * @throws IgniteCheckedException If failed. - */ - public IgfsHandshakeResponse handshake(String logDir) throws IgniteCheckedException, IOException; - - /** - * Close connection. - * - * @param force Force flag. - */ - public void close(boolean force); - - /** - * Command to retrieve file info for some GGFS path. - * - * @param path Path to get file info for. - * @return Future for info operation. - * @throws IgniteCheckedException If failed. - */ - public IgfsFile info(IgfsPath path) throws IgniteCheckedException, IOException; - - /** - * Command to update file properties. - * - * @param path GGFS path to update properties. - * @param props Properties to update. - * @return Future for update operation. - * @throws IgniteCheckedException If failed. - */ - public IgfsFile update(IgfsPath path, Map<String, String> props) throws IgniteCheckedException, IOException; - - /** - * Sets last access time and last modification time for a file. - * - * @param path Path to update times. - * @param accessTime Last access time to set. - * @param modificationTime Last modification time to set. - * @throws IgniteCheckedException If failed. - */ - public Boolean setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteCheckedException, - IOException; - - /** - * Command to rename given path. - * - * @param src Source path. - * @param dest Destination path. - * @return Future for rename operation. - * @throws IgniteCheckedException If failed. - */ - public Boolean rename(IgfsPath src, IgfsPath dest) throws IgniteCheckedException, IOException; - - /** - * Command to delete given path. - * - * @param path Path to delete. - * @param recursive {@code True} if deletion is recursive. - * @return Future for delete operation. - * @throws IgniteCheckedException If failed. - */ - public Boolean delete(IgfsPath path, boolean recursive) throws IgniteCheckedException, IOException; - - /** - * Command to get affinity for given path, offset and length. - * - * @param path Path to get affinity for. - * @param start Start position (offset). - * @param len Data length. - * @return Future for affinity command. - * @throws IgniteCheckedException If failed. - */ - public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len) throws IgniteCheckedException, - IOException; - - /** - * Gets path summary. - * - * @param path Path to get summary for. - * @return Future that will be completed when summary is received. - * @throws IgniteCheckedException If failed. - */ - public IgfsPathSummary contentSummary(IgfsPath path) throws IgniteCheckedException, IOException; - - /** - * Command to create directories. - * - * @param path Path to create. - * @return Future for mkdirs operation. - * @throws IgniteCheckedException If failed. - */ - public Boolean mkdirs(IgfsPath path, Map<String, String> props) throws IgniteCheckedException, IOException; - - /** - * Command to get list of files in directory. - * - * @param path Path to list. - * @return Future for listFiles operation. - * @throws IgniteCheckedException If failed. - */ - public Collection<IgfsFile> listFiles(IgfsPath path) throws IgniteCheckedException, IOException; - - /** - * Command to get directory listing. - * - * @param path Path to list. - * @return Future for listPaths operation. - * @throws IgniteCheckedException If failed. - */ - public Collection<IgfsPath> listPaths(IgfsPath path) throws IgniteCheckedException, IOException; - - /** - * Performs status request. - * - * @return Status response. - * @throws IgniteCheckedException If failed. - */ - public IgfsStatus fsStatus() throws IgniteCheckedException, IOException; - - /** - * Command to open file for reading. - * - * @param path File path to open. - * @return Future for open operation. - * @throws IgniteCheckedException If failed. - */ - public IgfsHadoopStreamDelegate open(IgfsPath path) throws IgniteCheckedException, IOException; - - /** - * Command to open file for reading. - * - * @param path File path to open. - * @return Future for open operation. - * @throws IgniteCheckedException If failed. - */ - public IgfsHadoopStreamDelegate open(IgfsPath path, int seqReadsBeforePrefetch) throws IgniteCheckedException, - IOException; - - /** - * Command to create file and open it for output. - * - * @param path Path to file. - * @param overwrite If {@code true} then old file contents will be lost. - * @param colocate If {@code true} and called on data node, file will be written on that node. - * @param replication Replication factor. - * @param props File properties for creation. - * @return Stream descriptor. - * @throws IgniteCheckedException If failed. - */ - public IgfsHadoopStreamDelegate create(IgfsPath path, boolean overwrite, boolean colocate, - int replication, long blockSize, @Nullable Map<String, String> props) throws IgniteCheckedException, IOException; - - /** - * Open file for output appending data to the end of a file. - * - * @param path Path to file. - * @param create If {@code true}, file will be created if does not exist. - * @param props File properties. - * @return Stream descriptor. - * @throws IgniteCheckedException If failed. - */ - public IgfsHadoopStreamDelegate append(IgfsPath path, boolean create, - @Nullable Map<String, String> props) throws IgniteCheckedException, IOException; -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/IgfsHadoopCommunicationException.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/IgfsHadoopCommunicationException.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/IgfsHadoopCommunicationException.java deleted file mode 100644 index 6fc53f0..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/IgfsHadoopCommunicationException.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.fs.hadoop; - -import org.apache.ignite.*; - -/** - * Communication exception indicating a problem between file system and GGFS instance. - */ -public class IgfsHadoopCommunicationException extends IgniteCheckedException { - /** */ - private static final long serialVersionUID = 0L; - - /** - * Creates new exception with given throwable as a nested cause and - * source of error message. - * - * @param cause Non-null throwable cause. - */ - public IgfsHadoopCommunicationException(Exception cause) { - super(cause); - } - - /** - * Creates a new exception with given error message and optional nested cause exception. - * - * @param msg Error message. - */ - public IgfsHadoopCommunicationException(String msg) { - super(msg); - } - - /** - * Creates a new exception with given error message and optional nested cause exception. - * - * @param msg Error message. - * @param cause Cause. - */ - public IgfsHadoopCommunicationException(String msg, Exception cause) { - super(msg, cause); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/IgfsHadoopEndpoint.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/IgfsHadoopEndpoint.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/IgfsHadoopEndpoint.java deleted file mode 100644 index d4f9a0a..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/IgfsHadoopEndpoint.java +++ /dev/null @@ -1,210 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.fs.hadoop; - -import org.apache.ignite.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.net.*; - -import static org.apache.ignite.configuration.IgfsConfiguration.*; - -/** - * GGFS endpoint abstraction. - */ -public class IgfsHadoopEndpoint { - /** Localhost. */ - public static final String LOCALHOST = "127.0.0.1"; - - /** GGFS name. */ - private final String ggfsName; - - /** Grid name. */ - private final String gridName; - - /** Host. */ - private final String host; - - /** Port. */ - private final int port; - - /** - * Normalize GGFS URI. - * - * @param uri URI. - * @return Normalized URI. - * @throws IOException If failed. - */ - public static URI normalize(URI uri) throws IOException { - try { - if (!F.eq(IgniteFs.GGFS_SCHEME, uri.getScheme())) - throw new IOException("Failed to normalize UIR because it has non GGFS scheme: " + uri); - - IgfsHadoopEndpoint endpoint = new IgfsHadoopEndpoint(uri.getAuthority()); - - StringBuilder sb = new StringBuilder(); - - if (endpoint.ggfs() != null) - sb.append(endpoint.ggfs()); - - if (endpoint.grid() != null) - sb.append(":").append(endpoint.grid()); - - return new URI(uri.getScheme(), sb.length() != 0 ? sb.toString() : null, endpoint.host(), endpoint.port(), - uri.getPath(), uri.getQuery(), uri.getFragment()); - } - catch (URISyntaxException | IgniteCheckedException e) { - throw new IOException("Failed to normalize URI: " + uri, e); - } - } - - /** - * Constructor. - * - * @param connStr Connection string. - * @throws IgniteCheckedException If failed to parse connection string. - */ - public IgfsHadoopEndpoint(@Nullable String connStr) throws IgniteCheckedException { - if (connStr == null) - connStr = ""; - - String[] tokens = connStr.split("@", -1); - - IgniteBiTuple<String, Integer> hostPort; - - if (tokens.length == 1) { - ggfsName = null; - gridName = null; - - hostPort = hostPort(connStr, connStr); - } - else if (tokens.length == 2) { - String authStr = tokens[0]; - - if (authStr.isEmpty()) { - gridName = null; - ggfsName = null; - } - else { - String[] authTokens = authStr.split(":", -1); - - ggfsName = F.isEmpty(authTokens[0]) ? null : authTokens[0]; - - if (authTokens.length == 1) - gridName = null; - else if (authTokens.length == 2) - gridName = F.isEmpty(authTokens[1]) ? null : authTokens[1]; - else - throw new IgniteCheckedException("Invalid connection string format: " + connStr); - } - - hostPort = hostPort(connStr, tokens[1]); - } - else - throw new IgniteCheckedException("Invalid connection string format: " + connStr); - - host = hostPort.get1(); - - assert hostPort.get2() != null; - - port = hostPort.get2(); - } - - /** - * Parse host and port. - * - * @param connStr Full connection string. - * @param hostPortStr Host/port connection string part. - * @return Tuple with host and port. - * @throws IgniteCheckedException If failed to parse connection string. - */ - private IgniteBiTuple<String, Integer> hostPort(String connStr, String hostPortStr) throws IgniteCheckedException { - String[] tokens = hostPortStr.split(":", -1); - - String host = tokens[0]; - - if (F.isEmpty(host)) - host = LOCALHOST; - - int port; - - if (tokens.length == 1) - port = DFLT_IPC_PORT; - else if (tokens.length == 2) { - String portStr = tokens[1]; - - try { - port = Integer.valueOf(portStr); - - if (port < 0 || port > 65535) - throw new IgniteCheckedException("Invalid port number: " + connStr); - } - catch (NumberFormatException e) { - throw new IgniteCheckedException("Invalid port number: " + connStr); - } - } - else - throw new IgniteCheckedException("Invalid connection string format: " + connStr); - - return F.t(host, port); - } - - /** - * @return GGFS name. - */ - @Nullable public String ggfs() { - return ggfsName; - } - - /** - * @return Grid name. - */ - @Nullable public String grid() { - return gridName; - } - - /** - * @return Host. - */ - public String host() { - return host; - } - - /** - * @return Host. - */ - public boolean isLocal() { - return F.eq(LOCALHOST, host); - } - - /** - * @return Port. - */ - public int port() { - return port; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(IgfsHadoopEndpoint.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/IgfsHadoopEx.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/IgfsHadoopEx.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/IgfsHadoopEx.java deleted file mode 100644 index 189d6ee..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/IgfsHadoopEx.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.fs.hadoop; - -import org.apache.ignite.internal.util.lang.*; -import org.jetbrains.annotations.*; - -import java.io.*; - -/** - * Extended GGFS server interface. - */ -public interface IgfsHadoopEx extends IgfsHadoop { - /** - * Adds event listener that will be invoked when connection with server is lost or remote error has occurred. - * If connection is closed already, callback will be invoked synchronously inside this method. - * - * @param delegate Stream delegate. - * @param lsnr Event listener. - */ - public void addEventListener(IgfsHadoopStreamDelegate delegate, IgfsHadoopStreamEventListener lsnr); - - /** - * Removes event listener that will be invoked when connection with server is lost or remote error has occurred. - * - * @param delegate Stream delegate. - */ - public void removeEventListener(IgfsHadoopStreamDelegate delegate); - - /** - * Asynchronously reads specified amount of bytes from opened input stream. - * - * @param delegate Stream delegate. - * @param pos Position to read from. - * @param len Data length to read. - * @param outBuf Optional output buffer. If buffer length is less then {@code len}, all remaining - * bytes will be read into new allocated buffer of length {len - outBuf.length} and this buffer will - * be the result of read future. - * @param outOff Output offset. - * @param outLen Output length. - * @return Read data. - */ - public GridPlainFuture<byte[]> readData(IgfsHadoopStreamDelegate delegate, long pos, int len, - @Nullable final byte[] outBuf, final int outOff, final int outLen); - - /** - * Writes data to the stream with given streamId. This method does not return any future since - * no response to write request is sent. - * - * @param delegate Stream delegate. - * @param data Data to write. - * @param off Offset. - * @param len Length. - * @throws IOException If failed. - */ - public void writeData(IgfsHadoopStreamDelegate delegate, byte[] data, int off, int len) throws IOException; - - /** - * Close server stream. - * - * @param delegate Stream delegate. - * @throws IOException If failed. - */ - public void closeStream(IgfsHadoopStreamDelegate delegate) throws IOException; - - /** - * Flush output stream. - * - * @param delegate Stream delegate. - * @throws IOException If failed. - */ - public void flush(IgfsHadoopStreamDelegate delegate) throws IOException; -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/IgfsHadoopFSProperties.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/IgfsHadoopFSProperties.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/IgfsHadoopFSProperties.java deleted file mode 100644 index 7704a5d..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/IgfsHadoopFSProperties.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.fs.hadoop; - -import org.apache.hadoop.fs.permission.*; -import org.apache.ignite.*; - -import java.util.*; - -import static org.apache.ignite.IgniteFs.*; - -/** - * Hadoop file system properties. - */ -class IgfsHadoopFSProperties { - /** Username. */ - private String usrName; - - /** Group name. */ - private String grpName; - - /** Permissions. */ - private FsPermission perm; - - /** - * Constructor. - * - * @param props Properties. - * @throws IgniteException In case of error. - */ - IgfsHadoopFSProperties(Map<String, String> props) throws IgniteException { - usrName = props.get(PROP_USER_NAME); - grpName = props.get(PROP_GROUP_NAME); - - String permStr = props.get(PROP_PERMISSION); - - if (permStr != null) { - try { - perm = new FsPermission((short)Integer.parseInt(permStr, 8)); - } - catch (NumberFormatException ignore) { - throw new IgniteException("Permissions cannot be parsed: " + permStr); - } - } - } - - /** - * Get user name. - * - * @return User name. - */ - String userName() { - return usrName; - } - - /** - * Get group name. - * - * @return Group name. - */ - String groupName() { - return grpName; - } - - /** - * Get permission. - * - * @return Permission. - */ - FsPermission permission() { - return perm; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/IgfsHadoopFileSystemWrapper.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/IgfsHadoopFileSystemWrapper.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/IgfsHadoopFileSystemWrapper.java deleted file mode 100644 index e2f55f0..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/IgfsHadoopFileSystemWrapper.java +++ /dev/null @@ -1,413 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.fs.hadoop; - -import org.apache.hadoop.conf.*; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.permission.*; -import org.apache.hadoop.ipc.*; -import org.apache.ignite.*; -import org.apache.ignite.igfs.*; -import org.apache.ignite.internal.processors.fs.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.net.*; -import java.util.*; - -/** - * Adapter to use any Hadoop file system {@link org.apache.hadoop.fs.FileSystem} as {@link org.apache.ignite.igfs.Igfs}. - */ -public class IgfsHadoopFileSystemWrapper implements Igfs, AutoCloseable { - /** Property name for path to Hadoop configuration. */ - public static final String SECONDARY_FS_CONFIG_PATH = "SECONDARY_FS_CONFIG_PATH"; - - /** Property name for URI of file system. */ - public static final String SECONDARY_FS_URI = "SECONDARY_FS_URI"; - - /** Hadoop file system. */ - private final FileSystem fileSys; - - /** Properties of file system */ - private final Map<String, String> props = new HashMap<>(); - - /** - * Constructor. - * - * @param uri URI of file system. - * @param cfgPath Additional path to Hadoop configuration. - * @throws IgniteCheckedException In case of error. - */ - public IgfsHadoopFileSystemWrapper(@Nullable String uri, @Nullable String cfgPath) throws IgniteCheckedException { - Configuration cfg = new Configuration(); - - if (cfgPath != null) - cfg.addResource(U.resolveIgniteUrl(cfgPath)); - - try { - fileSys = uri == null ? FileSystem.get(cfg) : FileSystem.get(new URI(uri), cfg); - } - catch (IOException | URISyntaxException e) { - throw new IgniteCheckedException(e); - } - - uri = fileSys.getUri().toString(); - - if (!uri.endsWith("/")) - uri += "/"; - - props.put(SECONDARY_FS_CONFIG_PATH, cfgPath); - props.put(SECONDARY_FS_URI, uri); - } - - /** - * Convert GGFS path into Hadoop path. - * - * @param path GGFS path. - * @return Hadoop path. - */ - private Path convert(IgfsPath path) { - URI uri = fileSys.getUri(); - - return new Path(uri.getScheme(), uri.getAuthority(), path.toString()); - } - - /** - * Heuristically checks if exception was caused by invalid HDFS version and returns appropriate exception. - * - * @param e Exception to check. - * @param detailMsg Detailed error message. - * @return Appropriate exception. - */ - private IgfsException handleSecondaryFsError(IOException e, String detailMsg) { - boolean wrongVer = X.hasCause(e, RemoteException.class) || - (e.getMessage() != null && e.getMessage().contains("Failed on local")); - - IgfsException ggfsErr = !wrongVer ? cast(detailMsg, e) : - new IgfsInvalidHdfsVersionException("HDFS version you are connecting to differs from local " + - "version.", e); - - - - return ggfsErr; - } - - /** - * Cast IO exception to GGFS exception. - * - * @param e IO exception. - * @return GGFS exception. - */ - public static IgfsException cast(String msg, IOException e) { - if (e instanceof FileNotFoundException) - return new IgfsFileNotFoundException(e); - else if (e instanceof ParentNotDirectoryException) - return new IgfsParentNotDirectoryException(msg, e); - else if (e instanceof PathIsNotEmptyDirectoryException) - return new IgfsDirectoryNotEmptyException(e); - else if (e instanceof PathExistsException) - return new IgfsPathAlreadyExistsException(msg, e); - else - return new IgfsException(msg, e); - } - - /** - * Convert Hadoop FileStatus properties to map. - * - * @param status File status. - * @return GGFS attributes. - */ - private static Map<String, String> properties(FileStatus status) { - FsPermission perm = status.getPermission(); - - if (perm == null) - perm = FsPermission.getDefault(); - - return F.asMap(PROP_PERMISSION, String.format("%04o", perm.toShort()), PROP_USER_NAME, status.getOwner(), - PROP_GROUP_NAME, status.getGroup()); - } - - /** {@inheritDoc} */ - @Override public boolean exists(IgfsPath path) { - try { - return fileSys.exists(convert(path)); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to check file existence [path=" + path + "]"); - } - } - - /** {@inheritDoc} */ - @Nullable @Override public IgfsFile update(IgfsPath path, Map<String, String> props) { - IgfsHadoopFSProperties props0 = new IgfsHadoopFSProperties(props); - - try { - if (props0.userName() != null || props0.groupName() != null) - fileSys.setOwner(convert(path), props0.userName(), props0.groupName()); - - if (props0.permission() != null) - fileSys.setPermission(convert(path), props0.permission()); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to update file properties [path=" + path + "]"); - } - - //Result is not used in case of secondary FS. - return null; - } - - /** {@inheritDoc} */ - @Override public void rename(IgfsPath src, IgfsPath dest) { - // Delegate to the secondary file system. - try { - if (!fileSys.rename(convert(src), convert(dest))) - throw new IgfsException("Failed to rename (secondary file system returned false) " + - "[src=" + src + ", dest=" + dest + ']'); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to rename file [src=" + src + ", dest=" + dest + ']'); - } - } - - /** {@inheritDoc} */ - @Override public boolean delete(IgfsPath path, boolean recursive) { - try { - return fileSys.delete(convert(path), recursive); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to delete file [path=" + path + ", recursive=" + recursive + "]"); - } - } - - /** {@inheritDoc} */ - @Override public void mkdirs(IgfsPath path) { - try { - if (!fileSys.mkdirs(convert(path))) - throw new IgniteException("Failed to make directories [path=" + path + "]"); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to make directories [path=" + path + "]"); - } - } - - /** {@inheritDoc} */ - @Override public void mkdirs(IgfsPath path, @Nullable Map<String, String> props) { - try { - if (!fileSys.mkdirs(convert(path), new IgfsHadoopFSProperties(props).permission())) - throw new IgniteException("Failed to make directories [path=" + path + ", props=" + props + "]"); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to make directories [path=" + path + ", props=" + props + "]"); - } - } - - /** {@inheritDoc} */ - @Override public Collection<IgfsPath> listPaths(IgfsPath path) { - try { - FileStatus[] statuses = fileSys.listStatus(convert(path)); - - if (statuses == null) - throw new IgfsFileNotFoundException("Failed to list files (path not found): " + path); - - Collection<IgfsPath> res = new ArrayList<>(statuses.length); - - for (FileStatus status : statuses) - res.add(new IgfsPath(path, status.getPath().getName())); - - return res; - } - catch (FileNotFoundException ignored) { - throw new IgfsFileNotFoundException("Failed to list files (path not found): " + path); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to list statuses due to secondary file system exception: " + path); - } - } - - /** {@inheritDoc} */ - @Override public Collection<IgfsFile> listFiles(IgfsPath path) { - try { - FileStatus[] statuses = fileSys.listStatus(convert(path)); - - if (statuses == null) - throw new IgfsFileNotFoundException("Failed to list files (path not found): " + path); - - Collection<IgfsFile> res = new ArrayList<>(statuses.length); - - for (FileStatus status : statuses) { - IgfsFileInfo fsInfo = status.isDirectory() ? new IgfsFileInfo(true, properties(status)) : - new IgfsFileInfo((int)status.getBlockSize(), status.getLen(), null, null, false, - properties(status)); - - res.add(new IgfsFileImpl(new IgfsPath(path, status.getPath().getName()), fsInfo, 1)); - } - - return res; - } - catch (FileNotFoundException ignored) { - throw new IgfsFileNotFoundException("Failed to list files (path not found): " + path); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to list statuses due to secondary file system exception: " + path); - } - } - - /** {@inheritDoc} */ - @Override public IgfsReader open(IgfsPath path, int bufSize) { - return new IgfsHadoopReader(fileSys, convert(path), bufSize); - } - - /** {@inheritDoc} */ - @Override public OutputStream create(IgfsPath path, boolean overwrite) { - try { - return fileSys.create(convert(path), overwrite); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", overwrite=" + overwrite + "]"); - } - } - - /** {@inheritDoc} */ - @Override public OutputStream create(IgfsPath path, int bufSize, boolean overwrite, int replication, - long blockSize, @Nullable Map<String, String> props) { - IgfsHadoopFSProperties props0 = - new IgfsHadoopFSProperties(props != null ? props : Collections.<String, String>emptyMap()); - - try { - return fileSys.create(convert(path), props0.permission(), overwrite, bufSize, (short)replication, blockSize, - null); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", props=" + props + - ", overwrite=" + overwrite + ", bufSize=" + bufSize + ", replication=" + replication + - ", blockSize=" + blockSize + "]"); - } - } - - /** {@inheritDoc} */ - @Override public OutputStream append(IgfsPath path, int bufSize, boolean create, - @Nullable Map<String, String> props) { - try { - return fileSys.append(convert(path), bufSize); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to append file [path=" + path + ", bufSize=" + bufSize + "]"); - } - } - - /** {@inheritDoc} */ - @Override public IgfsFile info(final IgfsPath path) { - try { - final FileStatus status = fileSys.getFileStatus(convert(path)); - - if (status == null) - return null; - - final Map<String, String> props = properties(status); - - return new IgfsFile() { - @Override public IgfsPath path() { - return path; - } - - @Override public boolean isFile() { - return status.isFile(); - } - - @Override public boolean isDirectory() { - return status.isDirectory(); - } - - @Override public int blockSize() { - return (int)status.getBlockSize(); - } - - @Override public long groupBlockSize() { - return status.getBlockSize(); - } - - @Override public long accessTime() { - return status.getAccessTime(); - } - - @Override public long modificationTime() { - return status.getModificationTime(); - } - - @Override public String property(String name) throws IllegalArgumentException { - String val = props.get(name); - - if (val == null) - throw new IllegalArgumentException("File property not found [path=" + path + ", name=" + name + ']'); - - return val; - } - - @Nullable @Override public String property(String name, @Nullable String dfltVal) { - String val = props.get(name); - - return val == null ? dfltVal : val; - } - - @Override public long length() { - return status.getLen(); - } - - /** {@inheritDoc} */ - @Override public Map<String, String> properties() { - return props; - } - }; - - } - catch (FileNotFoundException ignore) { - return null; - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to get file status [path=" + path + "]"); - } - } - - /** {@inheritDoc} */ - @Override public long usedSpaceSize() { - try { - return fileSys.getContentSummary(new Path(fileSys.getUri())).getSpaceConsumed(); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to get used space size of file system."); - } - } - - /** {@inheritDoc} */ - @Nullable @Override public Map<String, String> properties() { - return props; - } - - /** {@inheritDoc} */ - @Override public void close() throws IgniteCheckedException { - try { - fileSys.close(); - } - catch (IOException e) { - throw new IgniteCheckedException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/IgfsHadoopFuture.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/IgfsHadoopFuture.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/IgfsHadoopFuture.java deleted file mode 100644 index d02ab11..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/IgfsHadoopFuture.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.fs.hadoop; - -import org.apache.ignite.internal.util.lang.*; -import org.jetbrains.annotations.*; - -/** - * GGFS client future that holds response parse closure. - */ -public class IgfsHadoopFuture<T> extends GridPlainFutureAdapter<T> { - /** Output buffer. */ - private byte[] outBuf; - - /** Output offset. */ - private int outOff; - - /** Output length. */ - private int outLen; - - /** Read future flag. */ - private boolean read; - - /** - * @return Output buffer. - */ - public byte[] outputBuffer() { - return outBuf; - } - - /** - * @param outBuf Output buffer. - */ - public void outputBuffer(@Nullable byte[] outBuf) { - this.outBuf = outBuf; - } - - /** - * @return Offset in output buffer to write from. - */ - public int outputOffset() { - return outOff; - } - - /** - * @param outOff Offset in output buffer to write from. - */ - public void outputOffset(int outOff) { - this.outOff = outOff; - } - - /** - * @return Length to write to output buffer. - */ - public int outputLength() { - return outLen; - } - - /** - * @param outLen Length to write to output buffer. - */ - public void outputLength(int outLen) { - this.outLen = outLen; - } - - /** - * @param read {@code True} if this is a read future. - */ - public void read(boolean read) { - this.read = read; - } - - /** - * @return {@code True} if this is a read future. - */ - public boolean read() { - return read; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/IgfsHadoopInProc.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/IgfsHadoopInProc.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/IgfsHadoopInProc.java deleted file mode 100644 index 25d8c1d..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/IgfsHadoopInProc.java +++ /dev/null @@ -1,409 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.fs.hadoop; - -import org.apache.commons.logging.*; -import org.apache.ignite.*; -import org.apache.ignite.igfs.*; -import org.apache.ignite.internal.processors.fs.*; -import org.apache.ignite.internal.util.lang.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.*; - -/** - * Communication with grid in the same process. - */ -public class IgfsHadoopInProc implements IgfsHadoopEx { - /** Target GGFS. */ - private final IgfsEx ggfs; - - /** Buffer size. */ - private final int bufSize; - - /** Event listeners. */ - private final Map<IgfsHadoopStreamDelegate, IgfsHadoopStreamEventListener> lsnrs = - new ConcurrentHashMap<>(); - - /** Logger. */ - private final Log log; - - /** - * Constructor. - * - * @param ggfs Target GGFS. - * @param log Log. - */ - public IgfsHadoopInProc(IgfsEx ggfs, Log log) { - this.ggfs = ggfs; - this.log = log; - - bufSize = ggfs.configuration().getBlockSize() * 2; - } - - /** {@inheritDoc} */ - @Override public IgfsHandshakeResponse handshake(String logDir) { - ggfs.clientLogDirectory(logDir); - - return new IgfsHandshakeResponse(ggfs.name(), ggfs.proxyPaths(), ggfs.groupBlockSize(), - ggfs.globalSampling()); - } - - /** {@inheritDoc} */ - @Override public void close(boolean force) { - // Perform cleanup. - for (IgfsHadoopStreamEventListener lsnr : lsnrs.values()) { - try { - lsnr.onClose(); - } - catch (IgniteCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to notify stream event listener", e); - } - } - } - - /** {@inheritDoc} */ - @Override public IgfsFile info(IgfsPath path) throws IgniteCheckedException { - try { - return ggfs.info(path); - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new IgfsHadoopCommunicationException("Failed to get file info because Grid is stopping: " + path); - } - } - - /** {@inheritDoc} */ - @Override public IgfsFile update(IgfsPath path, Map<String, String> props) throws IgniteCheckedException { - try { - return ggfs.update(path, props); - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new IgfsHadoopCommunicationException("Failed to update file because Grid is stopping: " + path); - } - } - - /** {@inheritDoc} */ - @Override public Boolean setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteCheckedException { - try { - ggfs.setTimes(path, accessTime, modificationTime); - - return true; - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new IgfsHadoopCommunicationException("Failed to set path times because Grid is stopping: " + - path); - } - } - - /** {@inheritDoc} */ - @Override public Boolean rename(IgfsPath src, IgfsPath dest) throws IgniteCheckedException { - try { - ggfs.rename(src, dest); - - return true; - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new IgfsHadoopCommunicationException("Failed to rename path because Grid is stopping: " + src); - } - } - - /** {@inheritDoc} */ - @Override public Boolean delete(IgfsPath path, boolean recursive) throws IgniteCheckedException { - try { - return ggfs.delete(path, recursive); - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new IgfsHadoopCommunicationException("Failed to delete path because Grid is stopping: " + path); - } - } - - /** {@inheritDoc} */ - @Override public IgfsStatus fsStatus() throws IgniteCheckedException { - try { - return ggfs.globalSpace(); - } - catch (IllegalStateException e) { - throw new IgfsHadoopCommunicationException("Failed to get file system status because Grid is " + - "stopping."); - } - } - - /** {@inheritDoc} */ - @Override public Collection<IgfsPath> listPaths(IgfsPath path) throws IgniteCheckedException { - try { - return ggfs.listPaths(path); - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new IgfsHadoopCommunicationException("Failed to list paths because Grid is stopping: " + path); - } - } - - /** {@inheritDoc} */ - @Override public Collection<IgfsFile> listFiles(IgfsPath path) throws IgniteCheckedException { - try { - return ggfs.listFiles(path); - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new IgfsHadoopCommunicationException("Failed to list files because Grid is stopping: " + path); - } - } - - /** {@inheritDoc} */ - @Override public Boolean mkdirs(IgfsPath path, Map<String, String> props) throws IgniteCheckedException { - try { - ggfs.mkdirs(path, props); - - return true; - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new IgfsHadoopCommunicationException("Failed to create directory because Grid is stopping: " + - path); - } - } - - /** {@inheritDoc} */ - @Override public IgfsPathSummary contentSummary(IgfsPath path) throws IgniteCheckedException { - try { - return ggfs.summary(path); - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new IgfsHadoopCommunicationException("Failed to get content summary because Grid is stopping: " + - path); - } - } - - /** {@inheritDoc} */ - @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len) - throws IgniteCheckedException { - try { - return ggfs.affinity(path, start, len); - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new IgfsHadoopCommunicationException("Failed to get affinity because Grid is stopping: " + path); - } - } - - /** {@inheritDoc} */ - @Override public IgfsHadoopStreamDelegate open(IgfsPath path) throws IgniteCheckedException { - try { - IgfsInputStreamAdapter stream = ggfs.open(path, bufSize); - - return new IgfsHadoopStreamDelegate(this, stream, stream.fileInfo().length()); - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new IgfsHadoopCommunicationException("Failed to open file because Grid is stopping: " + path); - } - } - - /** {@inheritDoc} */ - @Override public IgfsHadoopStreamDelegate open(IgfsPath path, int seqReadsBeforePrefetch) - throws IgniteCheckedException { - try { - IgfsInputStreamAdapter stream = ggfs.open(path, bufSize, seqReadsBeforePrefetch); - - return new IgfsHadoopStreamDelegate(this, stream, stream.fileInfo().length()); - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new IgfsHadoopCommunicationException("Failed to open file because Grid is stopping: " + path); - } - } - - /** {@inheritDoc} */ - @Override public IgfsHadoopStreamDelegate create(IgfsPath path, boolean overwrite, boolean colocate, - int replication, long blockSize, @Nullable Map<String, String> props) throws IgniteCheckedException { - try { - IgfsOutputStream stream = ggfs.create(path, bufSize, overwrite, - colocate ? ggfs.nextAffinityKey() : null, replication, blockSize, props); - - return new IgfsHadoopStreamDelegate(this, stream); - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new IgfsHadoopCommunicationException("Failed to create file because Grid is stopping: " + path); - } - } - - /** {@inheritDoc} */ - @Override public IgfsHadoopStreamDelegate append(IgfsPath path, boolean create, - @Nullable Map<String, String> props) throws IgniteCheckedException { - try { - IgfsOutputStream stream = ggfs.append(path, bufSize, create, props); - - return new IgfsHadoopStreamDelegate(this, stream); - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new IgfsHadoopCommunicationException("Failed to append file because Grid is stopping: " + path); - } - } - - /** {@inheritDoc} */ - @Override public GridPlainFuture<byte[]> readData(IgfsHadoopStreamDelegate delegate, long pos, int len, - @Nullable byte[] outBuf, int outOff, int outLen) { - IgfsInputStreamAdapter stream = delegate.target(); - - try { - byte[] res = null; - - if (outBuf != null) { - int outTailLen = outBuf.length - outOff; - - if (len <= outTailLen) - stream.readFully(pos, outBuf, outOff, len); - else { - stream.readFully(pos, outBuf, outOff, outTailLen); - - int remainderLen = len - outTailLen; - - res = new byte[remainderLen]; - - stream.readFully(pos, res, 0, remainderLen); - } - } else { - res = new byte[len]; - - stream.readFully(pos, res, 0, len); - } - - return new GridPlainFutureAdapter<>(res); - } - catch (IllegalStateException | IOException e) { - IgfsHadoopStreamEventListener lsnr = lsnrs.get(delegate); - - if (lsnr != null) - lsnr.onError(e.getMessage()); - - return new GridPlainFutureAdapter<>(e); - } - } - - /** {@inheritDoc} */ - @Override public void writeData(IgfsHadoopStreamDelegate delegate, byte[] data, int off, int len) - throws IOException { - try { - IgfsOutputStream stream = delegate.target(); - - stream.write(data, off, len); - } - catch (IllegalStateException | IOException e) { - IgfsHadoopStreamEventListener lsnr = lsnrs.get(delegate); - - if (lsnr != null) - lsnr.onError(e.getMessage()); - - if (e instanceof IllegalStateException) - throw new IOException("Failed to write data to GGFS stream because Grid is stopping.", e); - else - throw e; - } - } - - /** {@inheritDoc} */ - @Override public void flush(IgfsHadoopStreamDelegate delegate) throws IOException { - try { - IgfsOutputStream stream = delegate.target(); - - stream.flush(); - } - catch (IllegalStateException | IOException e) { - IgfsHadoopStreamEventListener lsnr = lsnrs.get(delegate); - - if (lsnr != null) - lsnr.onError(e.getMessage()); - - if (e instanceof IllegalStateException) - throw new IOException("Failed to flush data to GGFS stream because Grid is stopping.", e); - else - throw e; - } - } - - /** {@inheritDoc} */ - @Override public void closeStream(IgfsHadoopStreamDelegate desc) throws IOException { - Closeable closeable = desc.target(); - - try { - closeable.close(); - } - catch (IllegalStateException e) { - throw new IOException("Failed to close GGFS stream because Grid is stopping.", e); - } - } - - /** {@inheritDoc} */ - @Override public void addEventListener(IgfsHadoopStreamDelegate delegate, - IgfsHadoopStreamEventListener lsnr) { - IgfsHadoopStreamEventListener lsnr0 = lsnrs.put(delegate, lsnr); - - assert lsnr0 == null || lsnr0 == lsnr; - - if (log.isDebugEnabled()) - log.debug("Added stream event listener [delegate=" + delegate + ']'); - } - - /** {@inheritDoc} */ - @Override public void removeEventListener(IgfsHadoopStreamDelegate delegate) { - IgfsHadoopStreamEventListener lsnr0 = lsnrs.remove(delegate); - - if (lsnr0 != null && log.isDebugEnabled()) - log.debug("Removed stream event listener [delegate=" + delegate + ']'); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/IgfsHadoopInputStream.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/IgfsHadoopInputStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/IgfsHadoopInputStream.java deleted file mode 100644 index c26a5ab..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/IgfsHadoopInputStream.java +++ /dev/null @@ -1,626 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.fs.hadoop; - -import org.apache.commons.logging.*; -import org.apache.hadoop.fs.*; -import org.apache.ignite.*; -import org.apache.ignite.internal.fs.common.*; -import org.apache.ignite.internal.util.lang.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; - -/** - * GGFS input stream wrapper for hadoop interfaces. - */ -@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") -public final class IgfsHadoopInputStream extends InputStream implements Seekable, PositionedReadable, - IgfsHadoopStreamEventListener { - /** Minimum buffer size. */ - private static final int MIN_BUF_SIZE = 4 * 1024; - - /** Server stream delegate. */ - private IgfsHadoopStreamDelegate delegate; - - /** Stream ID used by logger. */ - private long logStreamId; - - /** Stream position. */ - private long pos; - - /** Stream read limit. */ - private long limit; - - /** Mark position. */ - private long markPos = -1; - - /** Prefetch buffer. */ - private DoubleFetchBuffer buf = new DoubleFetchBuffer(); - - /** Buffer half size for double-buffering. */ - private int bufHalfSize; - - /** Closed flag. */ - private volatile boolean closed; - - /** Flag set if stream was closed due to connection breakage. */ - private boolean connBroken; - - /** Logger. */ - private Log log; - - /** Client logger. */ - private IgfsLogger clientLog; - - /** Read time. */ - private long readTime; - - /** User time. */ - private long userTime; - - /** Last timestamp. */ - private long lastTs; - - /** Amount of read bytes. */ - private long total; - - /** - * Creates input stream. - * - * @param delegate Server stream delegate. - * @param limit Read limit. - * @param bufSize Buffer size. - * @param log Log. - * @param clientLog Client logger. - */ - public IgfsHadoopInputStream(IgfsHadoopStreamDelegate delegate, long limit, int bufSize, Log log, - IgfsLogger clientLog, long logStreamId) { - assert limit >= 0; - - this.delegate = delegate; - this.limit = limit; - this.log = log; - this.clientLog = clientLog; - this.logStreamId = logStreamId; - - bufHalfSize = Math.max(bufSize, MIN_BUF_SIZE); - - lastTs = System.nanoTime(); - - delegate.hadoop().addEventListener(delegate, this); - } - - /** - * Read start. - */ - private void readStart() { - long now = System.nanoTime(); - - userTime += now - lastTs; - - lastTs = now; - } - - /** - * Read end. - */ - private void readEnd() { - long now = System.nanoTime(); - - readTime += now - lastTs; - - lastTs = now; - } - - /** {@inheritDoc} */ - @Override public synchronized int read() throws IOException { - checkClosed(); - - readStart(); - - try { - if (eof()) - return -1; - - buf.refreshAhead(pos); - - int res = buf.atPosition(pos); - - pos++; - total++; - - buf.refreshAhead(pos); - - return res; - } - catch (IgniteCheckedException e) { - throw IgfsHadoopUtils.cast(e); - } - finally { - readEnd(); - } - } - - /** {@inheritDoc} */ - @Override public synchronized int read(@NotNull byte[] b, int off, int len) throws IOException { - checkClosed(); - - if (eof()) - return -1; - - readStart(); - - try { - long remaining = limit - pos; - - int read = buf.flatten(b, pos, off, len); - - pos += read; - total += read; - remaining -= read; - - if (remaining > 0 && read != len) { - int readAmt = (int)Math.min(remaining, len - read); - - delegate.hadoop().readData(delegate, pos, readAmt, b, off + read, len - read).get(); - - read += readAmt; - pos += readAmt; - total += readAmt; - } - - buf.refreshAhead(pos); - - return read; - } - catch (IgniteCheckedException e) { - throw IgfsHadoopUtils.cast(e); - } - finally { - readEnd(); - } - } - - /** {@inheritDoc} */ - @Override public synchronized long skip(long n) throws IOException { - checkClosed(); - - if (clientLog.isLogEnabled()) - clientLog.logSkip(logStreamId, n); - - long oldPos = pos; - - if (pos + n <= limit) - pos += n; - else - pos = limit; - - buf.refreshAhead(pos); - - return pos - oldPos; - } - - /** {@inheritDoc} */ - @Override public synchronized int available() throws IOException { - checkClosed(); - - int available = buf.available(pos); - - assert available >= 0; - - return available; - } - - /** {@inheritDoc} */ - @Override public synchronized void close() throws IOException { - if (!closed) { - readStart(); - - if (log.isDebugEnabled()) - log.debug("Closing input stream: " + delegate); - - delegate.hadoop().closeStream(delegate); - - readEnd(); - - if (clientLog.isLogEnabled()) - clientLog.logCloseIn(logStreamId, userTime, readTime, total); - - markClosed(false); - - if (log.isDebugEnabled()) - log.debug("Closed stream [delegate=" + delegate + ", readTime=" + readTime + - ", userTime=" + userTime + ']'); - } - } - - /** {@inheritDoc} */ - @Override public synchronized void mark(int readLimit) { - markPos = pos; - - if (clientLog.isLogEnabled()) - clientLog.logMark(logStreamId, readLimit); - } - - /** {@inheritDoc} */ - @Override public synchronized void reset() throws IOException { - checkClosed(); - - if (clientLog.isLogEnabled()) - clientLog.logReset(logStreamId); - - if (markPos == -1) - throw new IOException("Stream was not marked."); - - pos = markPos; - - buf.refreshAhead(pos); - } - - /** {@inheritDoc} */ - @Override public boolean markSupported() { - return true; - } - - /** {@inheritDoc} */ - @Override public synchronized int read(long position, byte[] buf, int off, int len) throws IOException { - long remaining = limit - position; - - int read = (int)Math.min(len, remaining); - - // Return -1 at EOF. - if (read == 0) - return -1; - - readFully(position, buf, off, read); - - return read; - } - - /** {@inheritDoc} */ - @Override public synchronized void readFully(long position, byte[] buf, int off, int len) throws IOException { - long remaining = limit - position; - - checkClosed(); - - if (len > remaining) - throw new EOFException("End of stream reached before data was fully read."); - - readStart(); - - try { - int read = this.buf.flatten(buf, position, off, len); - - total += read; - - if (read != len) { - int readAmt = len - read; - - delegate.hadoop().readData(delegate, position + read, readAmt, buf, off + read, readAmt).get(); - - total += readAmt; - } - - if (clientLog.isLogEnabled()) - clientLog.logRandomRead(logStreamId, position, len); - } - catch (IgniteCheckedException e) { - throw IgfsHadoopUtils.cast(e); - } - finally { - readEnd(); - } - } - - /** {@inheritDoc} */ - @Override public void readFully(long position, byte[] buf) throws IOException { - readFully(position, buf, 0, buf.length); - } - - /** {@inheritDoc} */ - @Override public synchronized void seek(long pos) throws IOException { - A.ensure(pos >= 0, "position must be non-negative"); - - checkClosed(); - - if (clientLog.isLogEnabled()) - clientLog.logSeek(logStreamId, pos); - - if (pos > limit) - pos = limit; - - if (log.isDebugEnabled()) - log.debug("Seek to position [delegate=" + delegate + ", pos=" + pos + ", oldPos=" + this.pos + ']'); - - this.pos = pos; - - buf.refreshAhead(pos); - } - - /** {@inheritDoc} */ - @Override public synchronized long getPos() { - return pos; - } - - /** {@inheritDoc} */ - @Override public synchronized boolean seekToNewSource(long targetPos) { - return false; - } - - /** {@inheritDoc} */ - @Override public void onClose() { - markClosed(true); - } - - /** {@inheritDoc} */ - @Override public void onError(String errMsg) { - // No-op. - } - - /** - * Marks stream as closed. - * - * @param connBroken {@code True} if connection with server was lost. - */ - private void markClosed(boolean connBroken) { - // It is ok to have race here. - if (!closed) { - closed = true; - - this.connBroken = connBroken; - - delegate.hadoop().removeEventListener(delegate); - } - } - - /** - * @throws IOException If check failed. - */ - private void checkClosed() throws IOException { - if (closed) { - if (connBroken) - throw new IOException("Server connection was lost."); - else - throw new IOException("Stream is closed."); - } - } - - /** - * @return {@code True} if end of stream reached. - */ - private boolean eof() { - return limit == pos; - } - - /** - * Asynchronous prefetch buffer. - */ - private static class FetchBufferPart { - /** Read future. */ - private GridPlainFuture<byte[]> readFut; - - /** Position of cached chunk in file. */ - private long pos; - - /** Prefetch length. Need to store as read future result might be not available yet. */ - private int len; - - /** - * Creates fetch buffer part. - * - * @param readFut Read future for this buffer. - * @param pos Read position. - * @param len Chunk length. - */ - private FetchBufferPart(GridPlainFuture<byte[]> readFut, long pos, int len) { - this.readFut = readFut; - this.pos = pos; - this.len = len; - } - - /** - * Copies cached data if specified position matches cached region. - * - * @param dst Destination buffer. - * @param pos Read position in file. - * @param dstOff Offset in destination buffer from which start writing. - * @param len Maximum number of bytes to copy. - * @return Number of bytes copied. - * @throws IgniteCheckedException If read future failed. - */ - public int flatten(byte[] dst, long pos, int dstOff, int len) throws IgniteCheckedException { - // If read start position is within cached boundaries. - if (contains(pos)) { - byte[] data = readFut.get(); - - int srcPos = (int)(pos - this.pos); - int cpLen = Math.min(len, data.length - srcPos); - - U.arrayCopy(data, srcPos, dst, dstOff, cpLen); - - return cpLen; - } - - return 0; - } - - /** - * @return {@code True} if data is ready to be read. - */ - public boolean ready() { - return readFut.isDone(); - } - - /** - * Checks if current buffer part contains given position. - * - * @param pos Position to check. - * @return {@code True} if position matches buffer region. - */ - public boolean contains(long pos) { - return this.pos <= pos && this.pos + len > pos; - } - } - - private class DoubleFetchBuffer { - /** */ - private FetchBufferPart first; - - /** */ - private FetchBufferPart second; - - /** - * Copies fetched data from both buffers to destination array if cached region matched read position. - * - * @param dst Destination buffer. - * @param pos Read position in file. - * @param dstOff Destination buffer offset. - * @param len Maximum number of bytes to copy. - * @return Number of bytes copied. - * @throws IgniteCheckedException If any read operation failed. - */ - public int flatten(byte[] dst, long pos, int dstOff, int len) throws IgniteCheckedException { - assert dstOff >= 0; - assert dstOff + len <= dst.length : "Invalid indices [dst.length=" + dst.length + ", dstOff=" + dstOff + - ", len=" + len + ']'; - - int bytesCopied = 0; - - if (first != null) { - bytesCopied += first.flatten(dst, pos, dstOff, len); - - if (bytesCopied != len && second != null) { - assert second.pos == first.pos + first.len; - - bytesCopied += second.flatten(dst, pos + bytesCopied, dstOff + bytesCopied, len - bytesCopied); - } - } - - return bytesCopied; - } - - /** - * Gets byte at specified position in buffer. - * - * @param pos Stream position. - * @return Read byte. - * @throws IgniteCheckedException If read failed. - */ - public int atPosition(long pos) throws IgniteCheckedException { - // Should not reach here if stream contains no data. - assert first != null; - - if (first.contains(pos)) { - byte[] bytes = first.readFut.get(); - - return bytes[((int)(pos - first.pos))] & 0xFF; - } - else { - assert second != null; - assert second.contains(pos); - - byte[] bytes = second.readFut.get(); - - return bytes[((int)(pos - second.pos))] & 0xFF; - } - } - - /** - * Starts asynchronous buffer refresh if needed, depending on current position. - * - * @param pos Current stream position. - */ - public void refreshAhead(long pos) { - if (fullPrefetch(pos)) { - first = fetch(pos, bufHalfSize); - second = fetch(pos + bufHalfSize, bufHalfSize); - } - else if (needFlip(pos)) { - first = second; - - second = fetch(first.pos + first.len, bufHalfSize); - } - } - - /** - * @param pos Position from which read is expected. - * @return Number of bytes available to be read without blocking. - */ - public int available(long pos) { - int available = 0; - - if (first != null) { - if (first.contains(pos)) { - if (first.ready()) { - available += (pos - first.pos); - - if (second != null && second.ready()) - available += second.len; - } - } - else { - if (second != null && second.contains(pos) && second.ready()) - available += (pos - second.pos); - } - } - - return available; - } - - /** - * Checks if position shifted enough to forget previous buffer. - * - * @param pos Current position. - * @return {@code True} if need flip buffers. - */ - private boolean needFlip(long pos) { - // Return true if we read more then half of second buffer. - return second != null && second.contains(pos); - } - - /** - * Determines if all cached bytes should be discarded and new region should be - * prefetched. - * - * @param curPos Current stream position. - * @return {@code True} if need to refresh both blocks. - */ - private boolean fullPrefetch(long curPos) { - // If no data was prefetched yet, return true. - return first == null || curPos < first.pos || (second != null && curPos >= second.pos + second.len); - } - - /** - * Starts asynchronous fetch for given region. - * - * @param pos Position to read from. - * @param size Number of bytes to read. - * @return Fetch buffer part. - */ - private FetchBufferPart fetch(long pos, int size) { - long remaining = limit - pos; - - size = (int)Math.min(size, remaining); - - return size <= 0 ? null : - new FetchBufferPart(delegate.hadoop().readData(delegate, pos, size, null, 0, 0), pos, size); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/IgfsHadoopIo.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/IgfsHadoopIo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/IgfsHadoopIo.java deleted file mode 100644 index 200841a..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/IgfsHadoopIo.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.fs.hadoop; - -import org.apache.ignite.*; -import org.apache.ignite.internal.fs.common.*; -import org.apache.ignite.internal.util.lang.*; -import org.jetbrains.annotations.*; - -/** - * IO abstraction layer for GGFS client. Two kind of messages are expected to be sent: requests with response - * and request without response. - */ -public interface IgfsHadoopIo { - /** - * Sends given GGFS client message and asynchronously awaits for response. - * - * @param msg Message to send. - * @return Future that will be completed. - * @throws IgniteCheckedException If a message cannot be sent (connection is broken or client was closed). - */ - public GridPlainFuture<IgfsMessage> send(IgfsMessage msg) throws IgniteCheckedException; - - /** - * Sends given GGFS client message and asynchronously awaits for response. When IO detects response - * beginning for given message it stops reading data and passes input stream to closure which can read - * response in a specific way. - * - * @param msg Message to send. - * @param outBuf Output buffer. If {@code null}, the output buffer is not used. - * @param outOff Output buffer offset. - * @param outLen Output buffer length. - * @return Future that will be completed when response is returned from closure. - * @throws IgniteCheckedException If a message cannot be sent (connection is broken or client was closed). - */ - public <T> GridPlainFuture<T> send(IgfsMessage msg, @Nullable byte[] outBuf, int outOff, int outLen) - throws IgniteCheckedException; - - /** - * Sends given message and does not wait for response. - * - * @param msg Message to send. - * @throws IgniteCheckedException If send failed. - */ - public void sendPlain(IgfsMessage msg) throws IgniteCheckedException; - - /** - * Adds event listener that will be invoked when connection with server is lost or remote error has occurred. - * If connection is closed already, callback will be invoked synchronously inside this method. - * - * @param lsnr Event listener. - */ - public void addEventListener(IgfsHadoopIpcIoListener lsnr); - - /** - * Removes event listener that will be invoked when connection with server is lost or remote error has occurred. - * - * @param lsnr Event listener. - */ - public void removeEventListener(IgfsHadoopIpcIoListener lsnr); -}
