http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java new file mode 100644 index 0000000..bd8ed2d --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java @@ -0,0 +1,1076 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.hadoop.fs.v2; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.AbstractFileSystem; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileChecksum; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FsServerDefaults; +import org.apache.hadoop.fs.FsStatus; +import org.apache.hadoop.fs.InvalidPathException; +import org.apache.hadoop.fs.Options; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.util.DataChecksum; +import org.apache.hadoop.util.Progressable; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.configuration.FileSystemConfiguration; +import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory; +import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem; +import org.apache.ignite.igfs.IgfsBlockLocation; +import org.apache.ignite.igfs.IgfsFile; +import org.apache.ignite.igfs.IgfsMode; +import org.apache.ignite.igfs.IgfsPath; +import org.apache.ignite.internal.igfs.common.IgfsLogger; +import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint; +import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsInputStream; +import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsOutputStream; +import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsProxyInputStream; +import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsProxyOutputStream; +import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsStreamDelegate; +import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsWrapper; +import org.apache.ignite.internal.processors.igfs.IgfsHandshakeResponse; +import org.apache.ignite.internal.processors.igfs.IgfsModeResolver; +import org.apache.ignite.internal.processors.igfs.IgfsPaths; +import org.apache.ignite.internal.processors.igfs.IgfsStatus; +import org.apache.ignite.internal.processors.igfs.IgfsUtils; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.internal.util.typedef.internal.A; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lifecycle.LifecycleAware; +import org.jetbrains.annotations.Nullable; + +import java.io.BufferedOutputStream; +import java.io.Closeable; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.OutputStream; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.ignite.configuration.FileSystemConfiguration.DFLT_IGFS_LOG_BATCH_SIZE; +import static org.apache.ignite.configuration.FileSystemConfiguration.DFLT_IGFS_LOG_DIR; +import static org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem.getFsHadoopUser; +import static org.apache.ignite.igfs.IgfsMode.PROXY; +import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_COLOCATED_WRITES; +import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_LOG_BATCH_SIZE; +import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_LOG_DIR; +import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_LOG_ENABLED; +import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_PREFER_LOCAL_WRITES; +import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH; +import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils.parameter; +import static org.apache.ignite.internal.processors.igfs.IgfsEx.IGFS_SCHEME; + +/** + * {@code IGFS} Hadoop 2.x file system driver over file system API. To use + * {@code IGFS} as Hadoop file system, you should configure this class + * in Hadoop's {@code core-site.xml} as follows: + * <pre name="code" class="xml"> + * <property> + * <name>fs.default.name</name> + * <value>igfs://ipc</value> + * </property> + * + * <property> + * <name>fs.igfs.impl</name> + * <value>org.apache.ignite.hadoop.fs.v2.IgniteHadoopFileSystem</value> + * </property> + * </pre> + * You should also add Ignite JAR and all libraries to Hadoop classpath. To + * do this, add following lines to {@code conf/hadoop-env.sh} script in Hadoop + * distribution: + * <pre name="code" class="bash"> + * export IGNITE_HOME=/path/to/Ignite/distribution + * export HADOOP_CLASSPATH=$IGNITE_HOME/ignite*.jar + * + * for f in $IGNITE_HOME/libs/*.jar; do + * export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$f; + * done + * </pre> + * <h1 class="header">Data vs Clients Nodes</h1> + * Hadoop needs to use its FileSystem remotely from client nodes as well as directly on + * data nodes. Client nodes are responsible for basic file system operations as well as + * accessing data nodes remotely. Usually, client nodes are started together + * with {@code job-submitter} or {@code job-scheduler} processes, while data nodes are usually + * started together with Hadoop {@code task-tracker} processes. + * <p> + * For sample client and data node configuration refer to {@code config/hadoop/default-config-client.xml} + * and {@code config/hadoop/default-config.xml} configuration files in Ignite installation. + */ +public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closeable { + /** Logger. */ + private static final Log LOG = LogFactory.getLog(IgniteHadoopFileSystem.class); + + /** Ensures that close routine is invoked at most once. */ + private final AtomicBoolean closeGuard = new AtomicBoolean(); + + /** Grid remote client. */ + private HadoopIgfsWrapper rmtClient; + + /** The name of the user this File System created on behalf of. */ + private final String user; + + /** Working directory. */ + private IgfsPath workingDir; + + /** URI. */ + private final URI uri; + + /** Authority. */ + private String uriAuthority; + + /** Client logger. */ + private IgfsLogger clientLog; + + /** Server block size. */ + private long grpBlockSize; + + /** Default replication factor. */ + private short dfltReplication; + + /** Secondary URI string. */ + private URI secondaryUri; + + /** Mode resolver. */ + private IgfsModeResolver modeRslvr; + + /** The secondary file system factory. */ + private HadoopFileSystemFactory factory; + + /** Whether custom sequential reads before prefetch value is provided. */ + private boolean seqReadsBeforePrefetchOverride; + + /** Custom-provided sequential reads before prefetch. */ + private int seqReadsBeforePrefetch; + + /** Flag that controls whether file writes should be colocated on data node. */ + private boolean colocateFileWrites; + + /** Prefer local writes. */ + private boolean preferLocFileWrites; + + /** + * @param name URI for file system. + * @param cfg Configuration. + * @throws URISyntaxException if name has invalid syntax. + * @throws IOException If initialization failed. + */ + public IgniteHadoopFileSystem(URI name, Configuration cfg) throws URISyntaxException, IOException { + super(HadoopIgfsEndpoint.normalize(name), IGFS_SCHEME, false, -1); + + uri = name; + + user = getFsHadoopUser(); + + try { + initialize(name, cfg); + } + catch (IOException e) { + // Close client if exception occurred. + if (rmtClient != null) + rmtClient.close(false); + + throw e; + } + + workingDir = new IgfsPath("/user/" + user); + } + + /** {@inheritDoc} */ + @Override public void checkPath(Path path) { + URI uri = path.toUri(); + + if (uri.isAbsolute()) { + if (!F.eq(uri.getScheme(), IGFS_SCHEME)) + throw new InvalidPathException("Wrong path scheme [expected=" + IGFS_SCHEME + ", actual=" + + uri.getAuthority() + ']'); + + if (!F.eq(uri.getAuthority(), uriAuthority)) + throw new InvalidPathException("Wrong path authority [expected=" + uriAuthority + ", actual=" + + uri.getAuthority() + ']'); + } + } + + /** + * Public setter that can be used by direct users of FS or Visor. + * + * @param colocateFileWrites Whether all ongoing file writes should be colocated. + */ + @SuppressWarnings("UnusedDeclaration") + public void colocateFileWrites(boolean colocateFileWrites) { + this.colocateFileWrites = colocateFileWrites; + } + + /** + * Enter busy state. + * + * @throws IOException If file system is stopped. + */ + private void enterBusy() throws IOException { + if (closeGuard.get()) + throw new IOException("File system is stopped."); + } + + /** + * Leave busy state. + */ + private void leaveBusy() { + // No-op. + } + + /** + * @param name URI passed to constructor. + * @param cfg Configuration passed to constructor. + * @throws IOException If initialization failed. + */ + @SuppressWarnings("ConstantConditions") + private void initialize(URI name, Configuration cfg) throws IOException { + enterBusy(); + + try { + if (rmtClient != null) + throw new IOException("File system is already initialized: " + rmtClient); + + A.notNull(name, "name"); + A.notNull(cfg, "cfg"); + + if (!IGFS_SCHEME.equals(name.getScheme())) + throw new IOException("Illegal file system URI [expected=" + IGFS_SCHEME + + "://[name]/[optional_path], actual=" + name + ']'); + + uriAuthority = name.getAuthority(); + + // Override sequential reads before prefetch if needed. + seqReadsBeforePrefetch = parameter(cfg, PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH, uriAuthority, 0); + + if (seqReadsBeforePrefetch > 0) + seqReadsBeforePrefetchOverride = true; + + // In Ignite replication factor is controlled by data cache affinity. + // We use replication factor to force the whole file to be stored on local node. + dfltReplication = (short)cfg.getInt("dfs.replication", 3); + + // Get file colocation control flag. + colocateFileWrites = parameter(cfg, PARAM_IGFS_COLOCATED_WRITES, uriAuthority, false); + preferLocFileWrites = cfg.getBoolean(PARAM_IGFS_PREFER_LOCAL_WRITES, false); + + // Get log directory. + String logDirCfg = parameter(cfg, PARAM_IGFS_LOG_DIR, uriAuthority, DFLT_IGFS_LOG_DIR); + + File logDirFile = U.resolveIgnitePath(logDirCfg); + + String logDir = logDirFile != null ? logDirFile.getAbsolutePath() : null; + + rmtClient = new HadoopIgfsWrapper(uriAuthority, logDir, cfg, LOG, user); + + // Handshake. + IgfsHandshakeResponse handshake = rmtClient.handshake(logDir); + + grpBlockSize = handshake.blockSize(); + + IgfsPaths paths = handshake.secondaryPaths(); + + Boolean logEnabled = parameter(cfg, PARAM_IGFS_LOG_ENABLED, uriAuthority, false); + + if (handshake.sampling() != null ? handshake.sampling() : logEnabled) { + // Initiate client logger. + if (logDir == null) + throw new IOException("Failed to resolve log directory: " + logDirCfg); + + Integer batchSize = parameter(cfg, PARAM_IGFS_LOG_BATCH_SIZE, uriAuthority, DFLT_IGFS_LOG_BATCH_SIZE); + + clientLog = IgfsLogger.logger(uriAuthority, handshake.igfsName(), logDir, batchSize); + } + else + clientLog = IgfsLogger.disabledLogger(); + + try { + modeRslvr = new IgfsModeResolver(paths.defaultMode(), paths.pathModes()); + } + catch (IgniteCheckedException ice) { + throw new IOException(ice); + } + + boolean initSecondary = paths.defaultMode() == PROXY; + + if (!initSecondary && paths.pathModes() != null) { + for (T2<IgfsPath, IgfsMode> pathMode : paths.pathModes()) { + IgfsMode mode = pathMode.getValue(); + + if (mode == PROXY) { + initSecondary = true; + + break; + } + } + } + + if (initSecondary) { + try { + factory = (HadoopFileSystemFactory) paths.getPayload(getClass().getClassLoader()); + } + catch (IgniteCheckedException e) { + throw new IOException("Failed to get secondary file system factory.", e); + } + + if (factory == null) + throw new IOException("Failed to get secondary file system factory (did you set " + + IgniteHadoopIgfsSecondaryFileSystem.class.getName() + " as \"secondaryFIleSystem\" in " + + FileSystemConfiguration.class.getName() + "?)"); + + assert factory != null; + + if (factory instanceof LifecycleAware) + ((LifecycleAware) factory).start(); + + try { + FileSystem secFs = factory.get(user); + + secondaryUri = secFs.getUri(); + + A.ensure(secondaryUri != null, "Secondary file system uri should not be null."); + } + catch (IOException e) { + throw new IOException("Failed to connect to the secondary file system: " + secondaryUri, e); + } + } + } + finally { + leaveBusy(); + } + } + + /** {@inheritDoc} */ + @Override public void close() throws IOException { + if (closeGuard.compareAndSet(false, true)) { + if (rmtClient == null) + return; + + rmtClient.close(false); + + if (clientLog.isLogEnabled()) + clientLog.close(); + + if (factory instanceof LifecycleAware) + ((LifecycleAware) factory).stop(); + + // Reset initialized resources. + rmtClient = null; + } + } + + /** {@inheritDoc} */ + @Override public URI getUri() { + return uri; + } + + /** {@inheritDoc} */ + @Override public int getUriDefaultPort() { + return -1; + } + + /** {@inheritDoc} */ + @Override public FsServerDefaults getServerDefaults() throws IOException { + return new FsServerDefaults(grpBlockSize, (int)grpBlockSize, (int)grpBlockSize, dfltReplication, 64 * 1024, + false, 0, DataChecksum.Type.NULL); + } + + /** {@inheritDoc} */ + @Override public boolean setReplication(Path f, short replication) throws IOException { + return mode(f) == PROXY && secondaryFileSystem().setReplication(f, replication); + } + + /** {@inheritDoc} */ + @Override public void setTimes(Path f, long mtime, long atime) throws IOException { + if (mode(f) == PROXY) + secondaryFileSystem().setTimes(f, mtime, atime); + else { + if (mtime == -1 && atime == -1) + return; + + rmtClient.setTimes(convert(f), atime, mtime); + } + } + + /** {@inheritDoc} */ + @Override public FsStatus getFsStatus() throws IOException { + IgfsStatus status = rmtClient.fsStatus(); + + return new FsStatus(status.spaceTotal(), status.spaceUsed(), status.spaceTotal() - status.spaceUsed()); + } + + /** {@inheritDoc} */ + @Override public void setPermission(Path p, FsPermission perm) throws IOException { + enterBusy(); + + try { + A.notNull(p, "p"); + + if (mode(p) == PROXY) + secondaryFileSystem().setPermission(toSecondary(p), perm); + else { + if (rmtClient.update(convert(p), permission(perm)) == null) + throw new IOException("Failed to set file permission (file not found?)" + + " [path=" + p + ", perm=" + perm + ']'); + } + } + finally { + leaveBusy(); + } + } + + /** {@inheritDoc} */ + @Override public void setOwner(Path p, String usr, String grp) throws IOException { + A.notNull(p, "p"); + A.notNull(usr, "username"); + A.notNull(grp, "grpName"); + + enterBusy(); + + try { + if (mode(p) == PROXY) + secondaryFileSystem().setOwner(toSecondary(p), usr, grp); + else if (rmtClient.update(convert(p), F.asMap(IgfsUtils.PROP_USER_NAME, usr, + IgfsUtils.PROP_GROUP_NAME, grp)) == null) { + throw new IOException("Failed to set file permission (file not found?)" + + " [path=" + p + ", username=" + usr + ", grpName=" + grp + ']'); + } + } + finally { + leaveBusy(); + } + } + + /** {@inheritDoc} */ + @Override public FSDataInputStream open(Path f, int bufSize) throws IOException { + A.notNull(f, "f"); + + enterBusy(); + + try { + IgfsPath path = convert(f); + IgfsMode mode = modeRslvr.resolveMode(path); + + if (mode == PROXY) { + FSDataInputStream is = secondaryFileSystem().open(toSecondary(f), bufSize); + + if (clientLog.isLogEnabled()) { + // At this point we do not know file size, so we perform additional request to remote FS to get it. + FileStatus status = secondaryFileSystem().getFileStatus(toSecondary(f)); + + long size = status != null ? status.getLen() : -1; + + long logId = IgfsLogger.nextId(); + + clientLog.logOpen(logId, path, PROXY, bufSize, size); + + return new FSDataInputStream(new HadoopIgfsProxyInputStream(is, clientLog, logId)); + } + else + return is; + } + else { + HadoopIgfsStreamDelegate stream = seqReadsBeforePrefetchOverride ? + rmtClient.open(path, seqReadsBeforePrefetch) : rmtClient.open(path); + + long logId = -1; + + if (clientLog.isLogEnabled()) { + logId = IgfsLogger.nextId(); + + clientLog.logOpen(logId, path, mode, bufSize, stream.length()); + } + + if (LOG.isDebugEnabled()) + LOG.debug("Opening input stream [thread=" + Thread.currentThread().getName() + ", path=" + path + + ", bufSize=" + bufSize + ']'); + + HadoopIgfsInputStream igfsIn = new HadoopIgfsInputStream(stream, stream.length(), + bufSize, LOG, clientLog, logId); + + if (LOG.isDebugEnabled()) + LOG.debug("Opened input stream [path=" + path + ", delegate=" + stream + ']'); + + return new FSDataInputStream(igfsIn); + } + } + finally { + leaveBusy(); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings("deprecation") + @Override public FSDataOutputStream createInternal( + Path f, + EnumSet<CreateFlag> flag, + FsPermission perm, + int bufSize, + short replication, + long blockSize, + Progressable progress, + Options.ChecksumOpt checksumOpt, + boolean createParent + ) throws IOException { + A.notNull(f, "f"); + + enterBusy(); + + boolean overwrite = flag.contains(CreateFlag.OVERWRITE); + boolean append = flag.contains(CreateFlag.APPEND); + boolean create = flag.contains(CreateFlag.CREATE); + + OutputStream out = null; + + try { + IgfsPath path = convert(f); + IgfsMode mode = modeRslvr.resolveMode(path); + + if (LOG.isDebugEnabled()) + LOG.debug("Opening output stream in create [thread=" + Thread.currentThread().getName() + "path=" + + path + ", overwrite=" + overwrite + ", bufSize=" + bufSize + ']'); + + if (mode == PROXY) { + FSDataOutputStream os = secondaryFileSystem().create(toSecondary(f), perm, flag, bufSize, + replication, blockSize, progress); + + if (clientLog.isLogEnabled()) { + long logId = IgfsLogger.nextId(); + + if (append) + clientLog.logAppend(logId, path, PROXY, bufSize); // Don't have stream ID. + else + clientLog.logCreate(logId, path, PROXY, overwrite, bufSize, replication, blockSize); + + return new FSDataOutputStream(new HadoopIgfsProxyOutputStream(os, clientLog, logId)); + } + else + return os; + } + else { + Map<String, String> permMap = F.asMap(IgfsUtils.PROP_PERMISSION, toString(perm), + IgfsUtils.PROP_PREFER_LOCAL_WRITES, Boolean.toString(preferLocFileWrites)); + + // Create stream and close it in the 'finally' section if any sequential operation failed. + HadoopIgfsStreamDelegate stream; + + long logId = -1; + + if (append) { + stream = rmtClient.append(path, create, permMap); + + if (clientLog.isLogEnabled()) { + logId = IgfsLogger.nextId(); + + clientLog.logAppend(logId, path, mode, bufSize); + } + + if (LOG.isDebugEnabled()) + LOG.debug("Opened output stream in append [path=" + path + ", delegate=" + stream + ']'); + } + else { + stream = rmtClient.create(path, overwrite, colocateFileWrites, replication, blockSize, + permMap); + + if (clientLog.isLogEnabled()) { + logId = IgfsLogger.nextId(); + + clientLog.logCreate(logId, path, mode, overwrite, bufSize, replication, blockSize); + } + + if (LOG.isDebugEnabled()) + LOG.debug("Opened output stream in create [path=" + path + ", delegate=" + stream + ']'); + } + + assert stream != null; + + HadoopIgfsOutputStream igfsOut = new HadoopIgfsOutputStream(stream, LOG, + clientLog, logId); + + bufSize = Math.max(64 * 1024, bufSize); + + out = new BufferedOutputStream(igfsOut, bufSize); + + FSDataOutputStream res = new FSDataOutputStream(out, null, 0); + + // Mark stream created successfully. + out = null; + + return res; + } + } + finally { + // Close if failed during stream creation. + if (out != null) + U.closeQuiet(out); + + leaveBusy(); + } + } + + /** {@inheritDoc} */ + @Override public boolean supportsSymlinks() { + return false; + } + + /** {@inheritDoc} */ + @Override public void renameInternal(Path src, Path dst) throws IOException { + A.notNull(src, "src"); + A.notNull(dst, "dst"); + + enterBusy(); + + try { + IgfsPath srcPath = convert(src); + IgfsPath dstPath = convert(dst); + + IgfsMode srcMode = modeRslvr.resolveMode(srcPath); + + if (clientLog.isLogEnabled()) + clientLog.logRename(srcPath, srcMode, dstPath); + + if (srcMode == PROXY) + secondaryFileSystem().rename(toSecondary(src), toSecondary(dst)); + else + rmtClient.rename(srcPath, dstPath); + } + finally { + leaveBusy(); + } + } + + /** {@inheritDoc} */ + @Override public boolean delete(Path f, boolean recursive) throws IOException { + A.notNull(f, "f"); + + enterBusy(); + + try { + IgfsPath path = convert(f); + + IgfsMode mode = modeRslvr.resolveMode(path); + + if (mode == PROXY) { + if (clientLog.isLogEnabled()) + clientLog.logDelete(path, PROXY, recursive); + + return secondaryFileSystem().delete(toSecondary(f), recursive); + } + + boolean res = rmtClient.delete(path, recursive); + + if (clientLog.isLogEnabled()) + clientLog.logDelete(path, mode, recursive); + + return res; + } + finally { + leaveBusy(); + } + } + + /** {@inheritDoc} */ + @Override public void setVerifyChecksum(boolean verifyChecksum) throws IOException { + // Checksum has effect for secondary FS only. + if (factory != null) + secondaryFileSystem().setVerifyChecksum(verifyChecksum); + } + + /** {@inheritDoc} */ + @Override public FileChecksum getFileChecksum(Path f) throws IOException { + if (mode(f) == PROXY) + return secondaryFileSystem().getFileChecksum(f); + + return null; + } + + /** {@inheritDoc} */ + @Override public FileStatus[] listStatus(Path f) throws IOException { + A.notNull(f, "f"); + + enterBusy(); + + try { + IgfsPath path = convert(f); + IgfsMode mode = modeRslvr.resolveMode(path); + + if (mode == PROXY) { + FileStatus[] arr = secondaryFileSystem().listStatus(toSecondary(f)); + + if (arr == null) + throw new FileNotFoundException("File " + f + " does not exist."); + + for (int i = 0; i < arr.length; i++) + arr[i] = toPrimary(arr[i]); + + if (clientLog.isLogEnabled()) { + String[] fileArr = new String[arr.length]; + + for (int i = 0; i < arr.length; i++) + fileArr[i] = arr[i].getPath().toString(); + + clientLog.logListDirectory(path, PROXY, fileArr); + } + + return arr; + } + else { + Collection<IgfsFile> list = rmtClient.listFiles(path); + + if (list == null) + throw new FileNotFoundException("File " + f + " does not exist."); + + List<IgfsFile> files = new ArrayList<>(list); + + FileStatus[] arr = new FileStatus[files.size()]; + + for (int i = 0; i < arr.length; i++) + arr[i] = convert(files.get(i)); + + if (clientLog.isLogEnabled()) { + String[] fileArr = new String[arr.length]; + + for (int i = 0; i < arr.length; i++) + fileArr[i] = arr[i].getPath().toString(); + + clientLog.logListDirectory(path, mode, fileArr); + } + + return arr; + } + } + finally { + leaveBusy(); + } + } + + /** {@inheritDoc} */ + @Override public void mkdir(Path f, FsPermission perm, boolean createParent) throws IOException { + A.notNull(f, "f"); + + enterBusy(); + + try { + IgfsPath path = convert(f); + IgfsMode mode = modeRslvr.resolveMode(path); + + if (mode == PROXY) { + if (clientLog.isLogEnabled()) + clientLog.logMakeDirectory(path, PROXY); + + secondaryFileSystem().mkdirs(toSecondary(f), perm); + } + else { + rmtClient.mkdirs(path, permission(perm)); + + if (clientLog.isLogEnabled()) + clientLog.logMakeDirectory(path, mode); + } + } + finally { + leaveBusy(); + } + } + + /** {@inheritDoc} */ + @Override public FileStatus getFileStatus(Path f) throws IOException { + A.notNull(f, "f"); + + enterBusy(); + + try { + if (mode(f) == PROXY) + return toPrimary(secondaryFileSystem().getFileStatus(toSecondary(f))); + else { + IgfsFile info = rmtClient.info(convert(f)); + + if (info == null) + throw new FileNotFoundException("File not found: " + f); + + return convert(info); + } + } + finally { + leaveBusy(); + } + } + + /** {@inheritDoc} */ + @Override public BlockLocation[] getFileBlockLocations(Path path, long start, long len) throws IOException { + A.notNull(path, "path"); + + IgfsPath igfsPath = convert(path); + + enterBusy(); + + try { + if (modeRslvr.resolveMode(igfsPath) == PROXY) + return secondaryFileSystem().getFileBlockLocations(path, start, len); + else { + long now = System.currentTimeMillis(); + + List<IgfsBlockLocation> affinity = new ArrayList<>( + rmtClient.affinity(igfsPath, start, len)); + + BlockLocation[] arr = new BlockLocation[affinity.size()]; + + for (int i = 0; i < arr.length; i++) + arr[i] = convert(affinity.get(i)); + + if (LOG.isDebugEnabled()) + LOG.debug("Fetched file locations [path=" + path + ", fetchTime=" + + (System.currentTimeMillis() - now) + ", locations=" + Arrays.asList(arr) + ']'); + + return arr; + } + } + finally { + leaveBusy(); + } + } + + /** + * Resolve path mode. + * + * @param path HDFS path. + * @return Path mode. + */ + public IgfsMode mode(Path path) { + return modeRslvr.resolveMode(convert(path)); + } + + /** + * Convert the given path to path acceptable by the primary file system. + * + * @param path Path. + * @return Primary file system path. + */ + private Path toPrimary(Path path) { + return convertPath(path, getUri()); + } + + /** + * Convert the given path to path acceptable by the secondary file system. + * + * @param path Path. + * @return Secondary file system path. + */ + private Path toSecondary(Path path) { + assert factory != null; + assert secondaryUri != null; + + return convertPath(path, secondaryUri); + } + + /** + * Convert path using the given new URI. + * + * @param path Old path. + * @param newUri New URI. + * @return New path. + */ + private Path convertPath(Path path, URI newUri) { + assert newUri != null; + + if (path != null) { + URI pathUri = path.toUri(); + + try { + return new Path(new URI(pathUri.getScheme() != null ? newUri.getScheme() : null, + pathUri.getAuthority() != null ? newUri.getAuthority() : null, pathUri.getPath(), null, null)); + } + catch (URISyntaxException e) { + throw new IgniteException("Failed to construct secondary file system path from the primary file " + + "system path: " + path, e); + } + } + else + return null; + } + + /** + * Convert a file status obtained from the secondary file system to a status of the primary file system. + * + * @param status Secondary file system status. + * @return Primary file system status. + */ + private FileStatus toPrimary(FileStatus status) { + return status != null ? new FileStatus(status.getLen(), status.isDirectory(), status.getReplication(), + status.getBlockSize(), status.getModificationTime(), status.getAccessTime(), status.getPermission(), + status.getOwner(), status.getGroup(), toPrimary(status.getPath())) : null; + } + + /** + * Convert IGFS path into Hadoop path. + * + * @param path IGFS path. + * @return Hadoop path. + */ + private Path convert(IgfsPath path) { + return new Path(IGFS_SCHEME, uriAuthority, path.toString()); + } + + /** + * Convert Hadoop path into IGFS path. + * + * @param path Hadoop path. + * @return IGFS path. + */ + @Nullable private IgfsPath convert(Path path) { + if (path == null) + return null; + + return path.isAbsolute() ? new IgfsPath(path.toUri().getPath()) : + new IgfsPath(workingDir, path.toUri().getPath()); + } + + /** + * Convert IGFS affinity block location into Hadoop affinity block location. + * + * @param block IGFS affinity block location. + * @return Hadoop affinity block location. + */ + private BlockLocation convert(IgfsBlockLocation block) { + Collection<String> names = block.names(); + Collection<String> hosts = block.hosts(); + + return new BlockLocation( + names.toArray(new String[names.size()]) /* hostname:portNumber of data nodes */, + hosts.toArray(new String[hosts.size()]) /* hostnames of data nodes */, + block.start(), block.length() + ) { + @Override public String toString() { + try { + return "BlockLocation [offset=" + getOffset() + ", length=" + getLength() + + ", hosts=" + Arrays.asList(getHosts()) + ", names=" + Arrays.asList(getNames()) + ']'; + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + }; + } + + /** + * Convert IGFS file information into Hadoop file status. + * + * @param file IGFS file information. + * @return Hadoop file status. + */ + private FileStatus convert(IgfsFile file) { + return new FileStatus( + file.length(), + file.isDirectory(), + dfltReplication, + file.groupBlockSize(), + file.modificationTime(), + file.accessTime(), + permission(file), + file.property(IgfsUtils.PROP_USER_NAME, user), + file.property(IgfsUtils.PROP_GROUP_NAME, "users"), + convert(file.path())) { + @Override public String toString() { + return "FileStatus [path=" + getPath() + ", isDir=" + isDirectory() + ", len=" + getLen() + "]"; + } + }; + } + + /** + * Convert Hadoop permission into IGFS file attribute. + * + * @param perm Hadoop permission. + * @return IGFS attributes. + */ + private Map<String, String> permission(FsPermission perm) { + if (perm == null) + perm = FsPermission.getDefault(); + + return F.asMap(IgfsUtils.PROP_PERMISSION, toString(perm)); + } + + /** + * @param perm Permission. + * @return String. + */ + private static String toString(FsPermission perm) { + return String.format("%04o", perm.toShort()); + } + + /** + * Convert IGFS file attributes into Hadoop permission. + * + * @param file File info. + * @return Hadoop permission. + */ + private FsPermission permission(IgfsFile file) { + String perm = file.property(IgfsUtils.PROP_PERMISSION, null); + + if (perm == null) + return FsPermission.getDefault(); + + try { + return new FsPermission((short)Integer.parseInt(perm, 8)); + } + catch (NumberFormatException ignore) { + return FsPermission.getDefault(); + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgniteHadoopFileSystem.class, this); + } + + /** + * Returns the user name this File System is created on behalf of. + * @return the user name + */ + public String user() { + return user; + } + + /** + * Gets cached or creates a {@link FileSystem}. + * + * @return The secondary file system. + */ + private FileSystem secondaryFileSystem() throws IOException{ + assert factory != null; + + return factory.get(user); + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/v2/package-info.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/v2/package-info.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/v2/package-info.java new file mode 100644 index 0000000..d8e70d1 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/v2/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * <!-- Package description. --> + * Contains Ignite Hadoop 2.x <code>FileSystem</code> implementation. + */ +package org.apache.ignite.hadoop.fs.v2; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopClientProtocolProvider.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopClientProtocolProvider.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopClientProtocolProvider.java new file mode 100644 index 0000000..583af35 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopClientProtocolProvider.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.hadoop.mapreduce; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.MRConfig; +import org.apache.hadoop.mapreduce.protocol.ClientProtocol; +import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.client.GridClient; +import org.apache.ignite.internal.client.GridClientConfiguration; +import org.apache.ignite.internal.client.GridClientException; +import org.apache.ignite.internal.client.GridClientFactory; +import org.apache.ignite.internal.client.marshaller.jdk.GridClientJdkMarshaller; +import org.apache.ignite.internal.processors.hadoop.proto.HadoopClientProtocol; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.F; + +import static org.apache.ignite.internal.client.GridClientProtocol.TCP; + + +/** + * Ignite Hadoop client protocol provider. + */ +public class IgniteHadoopClientProtocolProvider extends ClientProtocolProvider { + /** Framework name used in configuration. */ + public static final String FRAMEWORK_NAME = "ignite"; + + /** Clients. */ + private static final ConcurrentHashMap<String, IgniteInternalFuture<GridClient>> cliMap = new ConcurrentHashMap<>(); + + /** {@inheritDoc} */ + @Override public ClientProtocol create(Configuration conf) throws IOException { + if (FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) { + String addr = conf.get(MRConfig.MASTER_ADDRESS); + + if (F.isEmpty(addr)) + throw new IOException("Failed to create client protocol because server address is not specified (is " + + MRConfig.MASTER_ADDRESS + " property set?)."); + + if (F.eq(addr, "local")) + throw new IOException("Local execution mode is not supported, please point " + + MRConfig.MASTER_ADDRESS + " to real Ignite node."); + + return createProtocol(addr, conf); + } + + return null; + } + + /** {@inheritDoc} */ + @Override public ClientProtocol create(InetSocketAddress addr, Configuration conf) throws IOException { + if (FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) + return createProtocol(addr.getHostString() + ":" + addr.getPort(), conf); + + return null; + } + + /** {@inheritDoc} */ + @Override public void close(ClientProtocol cliProto) throws IOException { + // No-op. + } + + /** + * Internal protocol creation routine. + * + * @param addr Address. + * @param conf Configuration. + * @return Client protocol. + * @throws IOException If failed. + */ + private static ClientProtocol createProtocol(String addr, Configuration conf) throws IOException { + return new HadoopClientProtocol(conf, client(addr)); + } + + /** + * Create client. + * + * @param addr Endpoint address. + * @return Client. + * @throws IOException If failed. + */ + private static GridClient client(String addr) throws IOException { + try { + IgniteInternalFuture<GridClient> fut = cliMap.get(addr); + + if (fut == null) { + GridFutureAdapter<GridClient> fut0 = new GridFutureAdapter<>(); + + IgniteInternalFuture<GridClient> oldFut = cliMap.putIfAbsent(addr, fut0); + + if (oldFut != null) + return oldFut.get(); + else { + GridClientConfiguration cliCfg = new GridClientConfiguration(); + + cliCfg.setProtocol(TCP); + cliCfg.setServers(Collections.singletonList(addr)); + cliCfg.setMarshaller(new GridClientJdkMarshaller()); + cliCfg.setMaxConnectionIdleTime(24 * 60 * 60 * 1000L); // 1 day. + cliCfg.setDaemon(true); + + try { + GridClient cli = GridClientFactory.start(cliCfg); + + fut0.onDone(cli); + + return cli; + } + catch (GridClientException e) { + fut0.onDone(e); + + throw new IOException("Failed to establish connection with Ignite node: " + addr, e); + } + } + } + else + return fut.get(); + } + catch (IgniteCheckedException e) { + throw new IOException("Failed to establish connection with Ignite node: " + addr, e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java new file mode 100644 index 0000000..d4a44fa --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.hadoop.mapreduce; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.UUID; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.igfs.IgfsBlockLocation; +import org.apache.ignite.igfs.IgfsPath; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock; +import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit; +import org.apache.ignite.internal.processors.hadoop.HadoopJob; +import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan; +import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint; +import org.apache.ignite.internal.processors.hadoop.planner.HadoopDefaultMapReducePlan; +import org.apache.ignite.internal.processors.hadoop.planner.HadoopAbstractMapReducePlanner; +import org.apache.ignite.internal.processors.igfs.IgfsEx; +import org.apache.ignite.internal.util.typedef.F; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.IgniteFileSystem.IGFS_SCHEME; + +/** + * Default map-reduce planner implementation. + */ +public class IgniteHadoopMapReducePlanner extends HadoopAbstractMapReducePlanner { + /** {@inheritDoc} */ + @Override public HadoopMapReducePlan preparePlan(HadoopJob job, Collection<ClusterNode> top, + @Nullable HadoopMapReducePlan oldPlan) throws IgniteCheckedException { + // Convert collection of topology nodes to collection of topology node IDs. + Collection<UUID> topIds = new HashSet<>(top.size(), 1.0f); + + for (ClusterNode topNode : top) + topIds.add(topNode.id()); + + Map<UUID, Collection<HadoopInputSplit>> mappers = mappers(top, topIds, job.input()); + + int rdcCnt = job.info().reducers(); + + if (rdcCnt < 0) + throw new IgniteCheckedException("Number of reducers must be non-negative, actual: " + rdcCnt); + + Map<UUID, int[]> reducers = reducers(top, mappers, rdcCnt); + + return new HadoopDefaultMapReducePlan(mappers, reducers); + } + + /** + * Create plan for mappers. + * + * @param top Topology nodes. + * @param topIds Topology node IDs. + * @param splits Splits. + * @return Mappers map. + * @throws IgniteCheckedException If failed. + */ + private Map<UUID, Collection<HadoopInputSplit>> mappers(Collection<ClusterNode> top, Collection<UUID> topIds, + Iterable<HadoopInputSplit> splits) throws IgniteCheckedException { + Map<UUID, Collection<HadoopInputSplit>> mappers = new HashMap<>(); + + Map<String, Collection<UUID>> nodes = groupByHost(top); + + Map<UUID, Integer> nodeLoads = new HashMap<>(top.size(), 1.0f); // Track node load. + + for (UUID nodeId : topIds) + nodeLoads.put(nodeId, 0); + + for (HadoopInputSplit split : splits) { + UUID nodeId = nodeForSplit(split, topIds, nodes, nodeLoads); + + if (log.isDebugEnabled()) + log.debug("Mapped split to node [split=" + split + ", nodeId=" + nodeId + ']'); + + Collection<HadoopInputSplit> nodeSplits = mappers.get(nodeId); + + if (nodeSplits == null) { + nodeSplits = new ArrayList<>(); + + mappers.put(nodeId, nodeSplits); + } + + nodeSplits.add(split); + + // Updated node load. + nodeLoads.put(nodeId, nodeLoads.get(nodeId) + 1); + } + + return mappers; + } + + /** + * Determine the best node for this split. + * + * @param split Split. + * @param topIds Topology node IDs. + * @param nodes Nodes. + * @param nodeLoads Node load tracker. + * @return Node ID. + */ + @SuppressWarnings("unchecked") + private UUID nodeForSplit(HadoopInputSplit split, Collection<UUID> topIds, Map<String, Collection<UUID>> nodes, + Map<UUID, Integer> nodeLoads) throws IgniteCheckedException { + if (split instanceof HadoopFileBlock) { + HadoopFileBlock split0 = (HadoopFileBlock)split; + + if (IGFS_SCHEME.equalsIgnoreCase(split0.file().getScheme())) { + HadoopIgfsEndpoint endpoint = new HadoopIgfsEndpoint(split0.file().getAuthority()); + + IgfsEx igfs = null; + + if (F.eq(ignite.name(), endpoint.grid())) + igfs = (IgfsEx)((IgniteEx)ignite).igfsx(endpoint.igfs()); + + if (igfs != null && !igfs.isProxy(split0.file())) { + IgfsPath path = new IgfsPath(split0.file()); + + if (igfs.exists(path)) { + Collection<IgfsBlockLocation> blocks; + + try { + blocks = igfs.affinity(path, split0.start(), split0.length()); + } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } + + assert blocks != null; + + if (blocks.size() == 1) + // Fast-path, split consists of one IGFS block (as in most cases). + return bestNode(blocks.iterator().next().nodeIds(), topIds, nodeLoads, false); + else { + // Slow-path, file consists of multiple IGFS blocks. First, find the most co-located nodes. + Map<UUID, Long> nodeMap = new HashMap<>(); + + List<UUID> bestNodeIds = null; + long bestLen = -1L; + + for (IgfsBlockLocation block : blocks) { + for (UUID blockNodeId : block.nodeIds()) { + if (topIds.contains(blockNodeId)) { + Long oldLen = nodeMap.get(blockNodeId); + long newLen = oldLen == null ? block.length() : oldLen + block.length(); + + nodeMap.put(blockNodeId, newLen); + + if (bestNodeIds == null || bestLen < newLen) { + bestNodeIds = new ArrayList<>(1); + + bestNodeIds.add(blockNodeId); + + bestLen = newLen; + } + else if (bestLen == newLen) { + assert !F.isEmpty(bestNodeIds); + + bestNodeIds.add(blockNodeId); + } + } + } + } + + if (bestNodeIds != null) { + return bestNodeIds.size() == 1 ? bestNodeIds.get(0) : + bestNode(bestNodeIds, topIds, nodeLoads, true); + } + } + } + } + } + } + + // Cannot use local IGFS for some reason, try selecting the node by host. + Collection<UUID> blockNodes = null; + + for (String host : split.hosts()) { + Collection<UUID> hostNodes = nodes.get(host); + + if (!F.isEmpty(hostNodes)) { + if (blockNodes == null) + blockNodes = new ArrayList<>(hostNodes); + else + blockNodes.addAll(hostNodes); + } + } + + return bestNode(blockNodes, topIds, nodeLoads, false); + } + + /** + * Finds the best (the least loaded) node among the candidates. + * + * @param candidates Candidates. + * @param topIds Topology node IDs. + * @param nodeLoads Known node loads. + * @param skipTopCheck Whether to skip topology check. + * @return The best node. + */ + private UUID bestNode(@Nullable Collection<UUID> candidates, Collection<UUID> topIds, Map<UUID, Integer> nodeLoads, + boolean skipTopCheck) { + UUID bestNode = null; + int bestLoad = Integer.MAX_VALUE; + + if (candidates != null) { + for (UUID candidate : candidates) { + if (skipTopCheck || topIds.contains(candidate)) { + int load = nodeLoads.get(candidate); + + if (bestNode == null || bestLoad > load) { + bestNode = candidate; + bestLoad = load; + + if (bestLoad == 0) + break; // Minimum load possible, no need for further iterations. + } + } + } + } + + if (bestNode == null) { + // Blocks are located on nodes which are not Hadoop-enabled, assign to the least loaded one. + bestLoad = Integer.MAX_VALUE; + + for (UUID nodeId : topIds) { + int load = nodeLoads.get(nodeId); + + if (bestNode == null || bestLoad > load) { + bestNode = nodeId; + bestLoad = load; + + if (bestLoad == 0) + break; // Minimum load possible, no need for further iterations. + } + } + } + + assert bestNode != null; + + return bestNode; + } + + /** + * Create plan for reducers. + * + * @param top Topology. + * @param mappers Mappers map. + * @param reducerCnt Reducers count. + * @return Reducers map. + */ + private Map<UUID, int[]> reducers(Collection<ClusterNode> top, + Map<UUID, Collection<HadoopInputSplit>> mappers, int reducerCnt) { + // Determine initial node weights. + int totalWeight = 0; + + List<WeightedNode> nodes = new ArrayList<>(top.size()); + + for (ClusterNode node : top) { + Collection<HadoopInputSplit> split = mappers.get(node.id()); + + int weight = reducerNodeWeight(node, split != null ? split.size() : 0); + + nodes.add(new WeightedNode(node.id(), weight, weight)); + + totalWeight += weight; + } + + // Adjust weights. + int totalAdjustedWeight = 0; + + for (WeightedNode node : nodes) { + node.floatWeight = ((float)node.weight * reducerCnt) / totalWeight; + + node.weight = Math.round(node.floatWeight); + + totalAdjustedWeight += node.weight; + } + + // Apply redundant/lost reducers. + Collections.sort(nodes); + + if (totalAdjustedWeight > reducerCnt) { + // Too much reducers set. + ListIterator<WeightedNode> iter = nodes.listIterator(nodes.size() - 1); + + while (totalAdjustedWeight != reducerCnt) { + if (!iter.hasPrevious()) + iter = nodes.listIterator(nodes.size() - 1); + + WeightedNode node = iter.previous(); + + if (node.weight > 0) { + node.weight -= 1; + + totalAdjustedWeight--; + } + } + } + else if (totalAdjustedWeight < reducerCnt) { + // Not enough reducers set. + ListIterator<WeightedNode> iter = nodes.listIterator(0); + + while (totalAdjustedWeight != reducerCnt) { + if (!iter.hasNext()) + iter = nodes.listIterator(0); + + WeightedNode node = iter.next(); + + if (node.floatWeight > 0.0f) { + node.weight += 1; + + totalAdjustedWeight++; + } + } + } + + int idx = 0; + + Map<UUID, int[]> reducers = new HashMap<>(nodes.size(), 1.0f); + + for (WeightedNode node : nodes) { + if (node.weight > 0) { + int[] arr = new int[node.weight]; + + for (int i = 0; i < arr.length; i++) + arr[i] = idx++; + + reducers.put(node.nodeId, arr); + } + } + + return reducers; + } + + /** + * Calculate node weight based on node metrics and data co-location. + * + * @param node Node. + * @param splitCnt Splits mapped to this node. + * @return Node weight. + */ + @SuppressWarnings("UnusedParameters") + protected int reducerNodeWeight(ClusterNode node, int splitCnt) { + return splitCnt; + } + + /** + * Weighted node. + */ + private static class WeightedNode implements Comparable<WeightedNode> { + /** Node ID. */ + private final UUID nodeId; + + /** Weight. */ + private int weight; + + /** Floating point weight. */ + private float floatWeight; + + /** + * Constructor. + * + * @param nodeId Node ID. + * @param weight Weight. + * @param floatWeight Floating point weight. + */ + private WeightedNode(UUID nodeId, int weight, float floatWeight) { + this.nodeId = nodeId; + this.weight = weight; + this.floatWeight = floatWeight; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + return obj != null && obj instanceof WeightedNode && F.eq(nodeId, ((WeightedNode)obj).nodeId); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return nodeId.hashCode(); + } + + /** {@inheritDoc} */ + @Override public int compareTo(@NotNull WeightedNode other) { + float res = other.floatWeight - floatWeight; + + return res > 0.0f ? 1 : res < 0.0f ? -1 : nodeId.compareTo(other.nodeId); + } + } +} \ No newline at end of file