IGNITE-3414: Hadoop: implemented new weight-based map-reduce planner.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/73649386 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/73649386 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/73649386 Branch: refs/heads/ignite-2649 Commit: 736493865c1e3a56f864a01583d38e50d02b2c56 Parents: 5f57cc8 Author: vozerov-gridgain <voze...@gridgain.com> Authored: Tue Jul 19 15:16:21 2016 +0300 Committer: vozerov-gridgain <voze...@gridgain.com> Committed: Tue Jul 19 15:16:21 2016 +0300 ---------------------------------------------------------------------- .../processors/igfs/IgfsIgniteMock.java | 492 +++++++++++ .../internal/processors/igfs/IgfsMock.java | 397 +++++++++ .../mapreduce/IgniteHadoopMapReducePlanner.java | 48 +- .../IgniteHadoopWeightedMapReducePlanner.java | 846 +++++++++++++++++++ .../internal/processors/hadoop/HadoopUtils.java | 81 ++ .../planner/HadoopAbstractMapReducePlanner.java | 116 +++ .../planner/HadoopMapReducePlanGroup.java | 150 ++++ .../planner/HadoopMapReducePlanTopology.java | 89 ++ .../HadoopDefaultMapReducePlannerSelfTest.java | 451 +--------- .../processors/hadoop/HadoopMapReduceTest.java | 16 +- .../processors/hadoop/HadoopPlannerMockJob.java | 168 ++++ .../HadoopWeightedMapReducePlannerTest.java | 599 +++++++++++++ .../HadoopWeightedPlannerMapReduceTest.java | 38 + .../testsuites/IgniteHadoopTestSuite.java | 8 +- 14 files changed, 3022 insertions(+), 477 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/73649386/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsIgniteMock.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsIgniteMock.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsIgniteMock.java new file mode 100644 index 0000000..0c55595 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsIgniteMock.java @@ -0,0 +1,492 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs; + +import org.apache.ignite.IgniteAtomicLong; +import org.apache.ignite.IgniteAtomicReference; +import org.apache.ignite.IgniteAtomicSequence; +import org.apache.ignite.IgniteAtomicStamped; +import org.apache.ignite.IgniteBinary; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCompute; +import org.apache.ignite.IgniteCountDownLatch; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.IgniteEvents; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteFileSystem; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.IgniteMessaging; +import org.apache.ignite.IgniteQueue; +import org.apache.ignite.IgniteScheduler; +import org.apache.ignite.IgniteSemaphore; +import org.apache.ignite.IgniteServices; +import org.apache.ignite.IgniteSet; +import org.apache.ignite.IgniteTransactions; +import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.cluster.ClusterGroup; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.CollectionConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.cluster.IgniteClusterEx; +import org.apache.ignite.internal.processors.cache.GridCacheUtilityKey; +import org.apache.ignite.internal.processors.cache.IgniteInternalCache; +import org.apache.ignite.internal.processors.hadoop.Hadoop; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.lang.IgniteProductVersion; +import org.apache.ignite.plugin.IgnitePlugin; +import org.apache.ignite.plugin.PluginNotFoundException; +import org.jetbrains.annotations.Nullable; + +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.ExecutorService; + +/** + * Mocked Ignite implementation for IGFS tests. + */ +public class IgfsIgniteMock implements IgniteEx { + /** Name. */ + private final String name; + + /** IGFS. */ + private final IgniteFileSystem igfs; + + /** + * Constructor. + * + * @param igfs IGFS instance. + */ + public IgfsIgniteMock(@Nullable String name, IgniteFileSystem igfs) { + this.name = name; + this.igfs = igfs; + } + + /** {@inheritDoc} */ + @Override public <K extends GridCacheUtilityKey, V> IgniteInternalCache<K, V> utilityCache() { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Nullable @Override public <K, V> IgniteInternalCache<K, V> cachex(@Nullable String name) { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Nullable @Override public <K, V> IgniteInternalCache<K, V> cachex() { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public Collection<IgniteInternalCache<?, ?>> cachesx( + @Nullable IgnitePredicate<? super IgniteInternalCache<?, ?>>... p) { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public boolean eventUserRecordable(int type) { + throwUnsupported(); + + return false; + } + + /** {@inheritDoc} */ + @Override public boolean allEventsUserRecordable(int[] types) { + throwUnsupported(); + + return false; + } + + /** {@inheritDoc} */ + @Override public boolean isJmxRemoteEnabled() { + throwUnsupported(); + + return false; + } + + /** {@inheritDoc} */ + @Override public boolean isRestartEnabled() { + throwUnsupported(); + + return false; + } + + /** {@inheritDoc} */ + @Nullable @Override public IgniteFileSystem igfsx(@Nullable String name) { + return F.eq(name, igfs.name()) ? igfs : null; + } + + /** {@inheritDoc} */ + @Override public Hadoop hadoop() { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public IgniteClusterEx cluster() { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Nullable @Override public String latestVersion() { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public ClusterNode localNode() { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public GridKernalContext context() { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public String name() { + return name; + } + + /** {@inheritDoc} */ + @Override public IgniteLogger log() { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public IgniteConfiguration configuration() { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public IgniteCompute compute() { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public IgniteCompute compute(ClusterGroup grp) { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public IgniteMessaging message() { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public IgniteMessaging message(ClusterGroup grp) { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public IgniteEvents events() { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public IgniteEvents events(ClusterGroup grp) { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public IgniteServices services() { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public IgniteServices services(ClusterGroup grp) { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public ExecutorService executorService() { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public ExecutorService executorService(ClusterGroup grp) { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public IgniteProductVersion version() { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public IgniteScheduler scheduler() { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public <K, V> IgniteCache<K, V> createCache(CacheConfiguration<K, V> cacheCfg) { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public <K, V> IgniteCache<K, V> createCache(String cacheName) { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public <K, V> IgniteCache<K, V> getOrCreateCache(CacheConfiguration<K, V> cacheCfg) { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public <K, V> IgniteCache<K, V> getOrCreateCache(String cacheName) { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public <K, V> void addCacheConfiguration(CacheConfiguration<K, V> cacheCfg) { + throwUnsupported(); + } + + /** {@inheritDoc} */ + @Override public <K, V> IgniteCache<K, V> createCache(CacheConfiguration<K, V> cacheCfg, + NearCacheConfiguration<K, V> nearCfg) { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public <K, V> IgniteCache<K, V> getOrCreateCache(CacheConfiguration<K, V> cacheCfg, + NearCacheConfiguration<K, V> nearCfg) { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public <K, V> IgniteCache<K, V> createNearCache(@Nullable String cacheName, + NearCacheConfiguration<K, V> nearCfg) { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public <K, V> IgniteCache<K, V> getOrCreateNearCache(@Nullable String cacheName, + NearCacheConfiguration<K, V> nearCfg) { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public void destroyCache(String cacheName) { + throwUnsupported(); + } + + /** {@inheritDoc} */ + @Override public <K, V> IgniteCache<K, V> cache(@Nullable String name) { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public Collection<String> cacheNames() { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public IgniteTransactions transactions() { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public <K, V> IgniteDataStreamer<K, V> dataStreamer(@Nullable String cacheName) { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public IgniteFileSystem fileSystem(String name) { + IgniteFileSystem res = igfsx(name); + + if (res == null) + throw new IllegalArgumentException("IGFS is not configured: " + name); + + return res; + } + + /** {@inheritDoc} */ + @Override public Collection<IgniteFileSystem> fileSystems() { + return Collections.singleton(igfs); + } + + /** {@inheritDoc} */ + @Override public IgniteAtomicSequence atomicSequence(String name, long initVal, boolean create) + throws IgniteException { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public IgniteAtomicLong atomicLong(String name, long initVal, boolean create) throws IgniteException { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public <T> IgniteAtomicReference<T> atomicReference(String name, @Nullable T initVal, boolean create) + throws IgniteException { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public <T, S> IgniteAtomicStamped<T, S> atomicStamped(String name, @Nullable T initVal, + @Nullable S initStamp, boolean create) throws IgniteException { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public IgniteCountDownLatch countDownLatch(String name, int cnt, boolean autoDel, boolean create) + throws IgniteException { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public IgniteSemaphore semaphore(String name, int cnt, boolean failoverSafe, boolean create) + throws IgniteException { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public <T> IgniteQueue<T> queue(String name, int cap, @Nullable CollectionConfiguration cfg) + throws IgniteException { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public <T> IgniteSet<T> set(String name, @Nullable CollectionConfiguration cfg) throws IgniteException { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public <T extends IgnitePlugin> T plugin(String name) throws PluginNotFoundException { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public IgniteBinary binary() { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public void close() throws IgniteException { + throwUnsupported(); + } + + /** {@inheritDoc} */ + @Override public <K> Affinity<K> affinity(String cacheName) { + throwUnsupported(); + + return null; + } + + /** + * Throw {@link UnsupportedOperationException}. + */ + private static void throwUnsupported() { + throw new UnsupportedOperationException("Should not be called!"); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/73649386/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java new file mode 100644 index 0000000..dccab4a --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java @@ -0,0 +1,397 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteFileSystem; +import org.apache.ignite.configuration.FileSystemConfiguration; +import org.apache.ignite.igfs.IgfsBlockLocation; +import org.apache.ignite.igfs.IgfsFile; +import org.apache.ignite.igfs.IgfsMetrics; +import org.apache.ignite.igfs.IgfsOutputStream; +import org.apache.ignite.igfs.IgfsPath; +import org.apache.ignite.igfs.IgfsPathSummary; +import org.apache.ignite.igfs.mapreduce.IgfsRecordResolver; +import org.apache.ignite.igfs.mapreduce.IgfsTask; +import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem; +import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.lang.IgniteUuid; +import org.jetbrains.annotations.Nullable; + +import java.net.URI; +import java.util.Collection; +import java.util.Map; + +/** + * Mocked IGFS implementation for IGFS tests. + */ +public class IgfsMock implements IgfsEx { + /** Name. */ + private final String name; + + /** + * Constructor. + * + * @param name Name. + */ + public IgfsMock(@Nullable String name) { + this.name = name; + } + + /** {@inheritDoc} */ + @Override public void stop(boolean cancel) { + throwUnsupported(); + } + + /** {@inheritDoc} */ + @Override public IgfsContext context() { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public IgfsPaths proxyPaths() { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public IgfsInputStreamAdapter open(IgfsPath path, int bufSize, int seqReadsBeforePrefetch) throws IgniteException { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public IgfsInputStreamAdapter open(IgfsPath path) throws IgniteException { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public IgfsInputStreamAdapter open(IgfsPath path, int bufSize) throws IgniteException { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public IgfsStatus globalSpace() throws IgniteCheckedException { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public void globalSampling(@Nullable Boolean val) throws IgniteCheckedException { + throwUnsupported(); + } + + /** {@inheritDoc} */ + @Nullable @Override public Boolean globalSampling() { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public IgfsLocalMetrics localMetrics() { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public long groupBlockSize() { + throwUnsupported(); + + return 0; + } + + /** {@inheritDoc} */ + @Nullable @Override public String clientLogDirectory() { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public void clientLogDirectory(String logDir) { + throwUnsupported(); + } + + /** {@inheritDoc} */ + @Override public boolean evictExclude(IgfsPath path, boolean primary) { + throwUnsupported(); + + return false; + } + + /** {@inheritDoc} */ + @Override public IgniteUuid nextAffinityKey() { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public boolean isProxy(URI path) { + throwUnsupported(); + + return false; + } + + /** {@inheritDoc} */ + @Override public IgfsSecondaryFileSystem asSecondary() { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public String name() { + return name; + } + + /** {@inheritDoc} */ + @Override public FileSystemConfiguration configuration() { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public IgfsPathSummary summary(IgfsPath path) throws IgniteException { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public IgfsOutputStream create(IgfsPath path, boolean overwrite) throws IgniteException { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public IgfsOutputStream create(IgfsPath path, int bufSize, boolean overwrite, int replication, + long blockSize, @Nullable Map<String, String> props) throws IgniteException { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public IgfsOutputStream create(IgfsPath path, int bufSize, boolean overwrite, @Nullable IgniteUuid affKey, + int replication, long blockSize, @Nullable Map<String, String> props) throws IgniteException { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public IgfsOutputStream append(IgfsPath path, boolean create) throws IgniteException { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public IgfsOutputStream append(IgfsPath path, int bufSize, boolean create, + @Nullable Map<String, String> props) throws IgniteException { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public void setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteException { + throwUnsupported(); + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len) + throws IgniteException { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len, long maxLen) + throws IgniteException { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public IgfsMetrics metrics() throws IgniteException { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public void resetMetrics() throws IgniteException { + throwUnsupported(); + } + + /** {@inheritDoc} */ + @Override public long size(IgfsPath path) throws IgniteException { + throwUnsupported(); + + return 0; + } + + /** {@inheritDoc} */ + @Override public void format() throws IgniteException { + throwUnsupported(); + } + + /** {@inheritDoc} */ + @Override public <T, R> R execute(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr, + Collection<IgfsPath> paths, @Nullable T arg) throws IgniteException { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public <T, R> R execute(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr, + Collection<IgfsPath> paths, boolean skipNonExistentFiles, long maxRangeLen, @Nullable T arg) + throws IgniteException { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public <T, R> R execute(Class<? extends IgfsTask<T, R>> taskCls, @Nullable IgfsRecordResolver rslvr, + Collection<IgfsPath> paths, @Nullable T arg) throws IgniteException { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public <T, R> R execute(Class<? extends IgfsTask<T, R>> taskCls, @Nullable IgfsRecordResolver rslvr, + Collection<IgfsPath> paths, boolean skipNonExistentFiles, long maxRangeLen, @Nullable T arg) + throws IgniteException { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public boolean exists(IgfsPath path) { + throwUnsupported(); + + return false; + } + + /** {@inheritDoc} */ + @Override public IgfsFile update(IgfsPath path, Map<String, String> props) throws IgniteException { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public void rename(IgfsPath src, IgfsPath dest) throws IgniteException { + throwUnsupported(); + } + + /** {@inheritDoc} */ + @Override public boolean delete(IgfsPath path, boolean recursive) throws IgniteException { + throwUnsupported(); + + return false; + } + + /** {@inheritDoc} */ + @Override public void mkdirs(IgfsPath path) throws IgniteException { + throwUnsupported(); + } + + /** {@inheritDoc} */ + @Override public void mkdirs(IgfsPath path, @Nullable Map<String, String> props) throws IgniteException { + throwUnsupported(); + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsPath> listPaths(IgfsPath path) throws IgniteException { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsFile> listFiles(IgfsPath path) throws IgniteException { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Nullable @Override public IgfsFile info(IgfsPath path) throws IgniteException { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public long usedSpaceSize() throws IgniteException { + throwUnsupported(); + + return 0; + } + + /** {@inheritDoc} */ + @Override public IgniteFileSystem withAsync() { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public boolean isAsync() { + throwUnsupported(); + + return false; + } + + /** {@inheritDoc} */ + @Override public <R> IgniteFuture<R> future() { + throwUnsupported(); + + return null; + } + + /** + * Throw {@link UnsupportedOperationException}. + */ + private static void throwUnsupported() { + throw new UnsupportedOperationException("Should not be called!"); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/73649386/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java index 287b5ec..d4a44fa 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java @@ -26,10 +26,9 @@ import java.util.List; import java.util.ListIterator; import java.util.Map; import java.util.UUID; -import org.apache.ignite.Ignite; + import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; -import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.igfs.IgfsBlockLocation; import org.apache.ignite.igfs.IgfsPath; @@ -38,14 +37,11 @@ import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock; import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit; import org.apache.ignite.internal.processors.hadoop.HadoopJob; import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan; -import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlanner; import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint; import org.apache.ignite.internal.processors.hadoop.planner.HadoopDefaultMapReducePlan; +import org.apache.ignite.internal.processors.hadoop.planner.HadoopAbstractMapReducePlanner; import org.apache.ignite.internal.processors.igfs.IgfsEx; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.resources.IgniteInstanceResource; -import org.apache.ignite.resources.LoggerResource; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -54,16 +50,7 @@ import static org.apache.ignite.IgniteFileSystem.IGFS_SCHEME; /** * Default map-reduce planner implementation. */ -public class IgniteHadoopMapReducePlanner implements HadoopMapReducePlanner { - /** Injected grid. */ - @IgniteInstanceResource - private Ignite ignite; - - /** Logger. */ - @SuppressWarnings("UnusedDeclaration") - @LoggerResource - private IgniteLogger log; - +public class IgniteHadoopMapReducePlanner extends HadoopAbstractMapReducePlanner { /** {@inheritDoc} */ @Override public HadoopMapReducePlan preparePlan(HadoopJob job, Collection<ClusterNode> top, @Nullable HadoopMapReducePlan oldPlan) throws IgniteCheckedException { @@ -98,7 +85,7 @@ public class IgniteHadoopMapReducePlanner implements HadoopMapReducePlanner { Iterable<HadoopInputSplit> splits) throws IgniteCheckedException { Map<UUID, Collection<HadoopInputSplit>> mappers = new HashMap<>(); - Map<String, Collection<UUID>> nodes = hosts(top); + Map<String, Collection<UUID>> nodes = groupByHost(top); Map<UUID, Integer> nodeLoads = new HashMap<>(top.size(), 1.0f); // Track node load. @@ -129,33 +116,6 @@ public class IgniteHadoopMapReducePlanner implements HadoopMapReducePlanner { } /** - * Groups nodes by host names. - * - * @param top Topology to group. - * @return Map. - */ - private static Map<String, Collection<UUID>> hosts(Collection<ClusterNode> top) { - Map<String, Collection<UUID>> grouped = U.newHashMap(top.size()); - - for (ClusterNode node : top) { - for (String host : node.hostNames()) { - Collection<UUID> nodeIds = grouped.get(host); - - if (nodeIds == null) { - // Expecting 1-2 nodes per host. - nodeIds = new ArrayList<>(2); - - grouped.put(host, nodeIds); - } - - nodeIds.add(node.id()); - } - } - - return grouped; - } - - /** * Determine the best node for this split. * * @param split Split. http://git-wip-us.apache.org/repos/asf/ignite/blob/73649386/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java new file mode 100644 index 0000000..27ffc19 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java @@ -0,0 +1,846 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.hadoop.mapreduce; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteFileSystem; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.igfs.IgfsBlockLocation; +import org.apache.ignite.igfs.IgfsPath; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock; +import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit; +import org.apache.ignite.internal.processors.hadoop.HadoopJob; +import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan; +import org.apache.ignite.internal.processors.hadoop.HadoopUtils; +import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint; +import org.apache.ignite.internal.processors.hadoop.planner.HadoopAbstractMapReducePlanner; +import org.apache.ignite.internal.processors.hadoop.planner.HadoopDefaultMapReducePlan; +import org.apache.ignite.internal.processors.hadoop.planner.HadoopMapReducePlanGroup; +import org.apache.ignite.internal.processors.hadoop.planner.HadoopMapReducePlanTopology; +import org.apache.ignite.internal.processors.igfs.IgfsEx; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.Nullable; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.IdentityHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; + +/** + * Map-reduce planner which assigns mappers and reducers based on their "weights". Weight describes how much resources + * are required to execute particular map or reduce task. + * <p> + * Plan creation consists of two steps: assigning mappers and assigning reducers. + * <p> + * Mappers are assigned based on input split data location. For each input split we search for nodes where + * its data is stored. Planner tries to assign mappers to their affinity nodes first. This process is governed by two + * properties: + * <ul> + * <li><b>{@code localMapperWeight}</b> - weight of a map task when it is executed on an affinity node;</li> + * <li><b>{@code remoteMapperWeight}</b> - weight of a map task when it is executed on a non-affinity node.</li> + * </ul> + * Planning algorithm assign mappers so that total resulting weight on all nodes is minimum possible. + * <p> + * Reducers are assigned differently. First we try to distribute reducers across nodes with mappers. This approach + * could minimize expensive data transfer over network. Reducer assigned to a node with mapper is considered + * <b>{@code local}</b>. Otherwise it is considered <b>{@code remote}</b>. This process continue until certain weight + * threshold is reached what means that current node is already too busy and it should not have higher priority over + * other nodes any more. Threshold can be configured using <b>{@code preferLocalReducerThresholdWeight}</b> property. + * <p> + * When local reducer threshold is reached on all nodes, we distribute remaining reducers based on their local and + * remote weights in the same way as it is done for mappers. This process is governed by two + * properties: + * <ul> + * <li><b>{@code localReducerWeight}</b> - weight of a reduce task when it is executed on a node with mappers;</li> + * <li><b>{@code remoteReducerWeight}</b> - weight of a map task when it is executed on a node without mappers.</li> + * </ul> + */ +public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReducePlanner { + /** Default local mapper weight. */ + public static final int DFLT_LOC_MAPPER_WEIGHT = 100; + + /** Default remote mapper weight. */ + public static final int DFLT_RMT_MAPPER_WEIGHT = 100; + + /** Default local reducer weight. */ + public static final int DFLT_LOC_REDUCER_WEIGHT = 100; + + /** Default remote reducer weight. */ + public static final int DFLT_RMT_REDUCER_WEIGHT = 100; + + /** Default reducer migration threshold weight. */ + public static final int DFLT_PREFER_LOCAL_REDUCER_THRESHOLD_WEIGHT = 200; + + /** Local mapper weight. */ + private int locMapperWeight = DFLT_LOC_MAPPER_WEIGHT; + + /** Remote mapper weight. */ + private int rmtMapperWeight = DFLT_RMT_MAPPER_WEIGHT; + + /** Local reducer weight. */ + private int locReducerWeight = DFLT_LOC_REDUCER_WEIGHT; + + /** Remote reducer weight. */ + private int rmtReducerWeight = DFLT_RMT_REDUCER_WEIGHT; + + /** Reducer migration threshold weight. */ + private int preferLocReducerThresholdWeight = DFLT_PREFER_LOCAL_REDUCER_THRESHOLD_WEIGHT; + + /** {@inheritDoc} */ + @Override public HadoopMapReducePlan preparePlan(HadoopJob job, Collection<ClusterNode> nodes, + @Nullable HadoopMapReducePlan oldPlan) throws IgniteCheckedException { + List<HadoopInputSplit> splits = HadoopUtils.sortInputSplits(job.input()); + int reducerCnt = job.info().reducers(); + + if (reducerCnt < 0) + throw new IgniteCheckedException("Number of reducers must be non-negative, actual: " + reducerCnt); + + HadoopMapReducePlanTopology top = topology(nodes); + + Mappers mappers = assignMappers(splits, top); + + Map<UUID, int[]> reducers = assignReducers(splits, top, mappers, reducerCnt); + + return new HadoopDefaultMapReducePlan(mappers.nodeToSplits, reducers); + } + + /** + * Assign mappers to nodes. + * + * @param splits Input splits. + * @param top Topology. + * @return Mappers. + * @throws IgniteCheckedException If failed. + */ + private Mappers assignMappers(Collection<HadoopInputSplit> splits, + HadoopMapReducePlanTopology top) throws IgniteCheckedException { + Mappers res = new Mappers(); + + for (HadoopInputSplit split : splits) { + // Try getting IGFS affinity. + Collection<UUID> nodeIds = affinityNodesForSplit(split, top); + + // Get best node. + UUID node = bestMapperNode(nodeIds, top); + + assert node != null; + + res.add(split, node); + } + + return res; + } + + /** + * Get affinity nodes for the given input split. + * <p> + * Order in the returned collection *is* significant, meaning that nodes containing more data + * go first. This way, the 1st nodes in the collection considered to be preferable for scheduling. + * + * @param split Split. + * @param top Topology. + * @return Affintiy nodes. + * @throws IgniteCheckedException If failed. + */ + private Collection<UUID> affinityNodesForSplit(HadoopInputSplit split, HadoopMapReducePlanTopology top) + throws IgniteCheckedException { + Collection<UUID> igfsNodeIds = igfsAffinityNodesForSplit(split); + + if (igfsNodeIds != null) + return igfsNodeIds; + + Map<NodeIdAndLength, UUID> res = new TreeMap<>(); + + for (String host : split.hosts()) { + long len = split instanceof HadoopFileBlock ? ((HadoopFileBlock)split).length() : 0L; + + HadoopMapReducePlanGroup grp = top.groupForHost(host); + + if (grp != null) { + for (int i = 0; i < grp.nodeCount(); i++) { + UUID nodeId = grp.nodeId(i); + + res.put(new NodeIdAndLength(nodeId, len), nodeId); + } + } + } + + return new LinkedHashSet<>(res.values()); + } + + /** + * Get IGFS affinity nodes for split if possible. + * <p> + * Order in the returned collection *is* significant, meaning that nodes containing more data + * go first. This way, the 1st nodes in the collection considered to be preferable for scheduling. + * + * @param split Input split. + * @return IGFS affinity or {@code null} if IGFS is not available. + * @throws IgniteCheckedException If failed. + */ + @Nullable private Collection<UUID> igfsAffinityNodesForSplit(HadoopInputSplit split) throws IgniteCheckedException { + if (split instanceof HadoopFileBlock) { + HadoopFileBlock split0 = (HadoopFileBlock)split; + + if (IgniteFileSystem.IGFS_SCHEME.equalsIgnoreCase(split0.file().getScheme())) { + HadoopIgfsEndpoint endpoint = new HadoopIgfsEndpoint(split0.file().getAuthority()); + + IgfsEx igfs = null; + + if (F.eq(ignite.name(), endpoint.grid())) + igfs = (IgfsEx)((IgniteEx)ignite).igfsx(endpoint.igfs()); + + if (igfs != null && !igfs.isProxy(split0.file())) { + IgfsPath path = new IgfsPath(split0.file()); + + if (igfs.exists(path)) { + Collection<IgfsBlockLocation> blocks; + + try { + blocks = igfs.affinity(path, split0.start(), split0.length()); + } + catch (IgniteException e) { + throw new IgniteCheckedException("Failed to get IGFS file block affinity [path=" + path + + ", start=" + split0.start() + ", len=" + split0.length() + ']', e); + } + + assert blocks != null; + + if (blocks.size() == 1) + return blocks.iterator().next().nodeIds(); + else { + // The most "local" nodes go first. + Map<UUID, Long> idToLen = new HashMap<>(); + + for (IgfsBlockLocation block : blocks) { + for (UUID id : block.nodeIds()) { + Long len = idToLen.get(id); + + idToLen.put(id, len == null ? block.length() : block.length() + len); + } + } + + // Sort the nodes in non-ascending order by contained data lengths. + Map<NodeIdAndLength, UUID> res = new TreeMap<>(); + + for (Map.Entry<UUID, Long> idToLenEntry : idToLen.entrySet()) { + UUID id = idToLenEntry.getKey(); + + res.put(new NodeIdAndLength(id, idToLenEntry.getValue()), id); + } + + return new LinkedHashSet<>(res.values()); + } + } + } + } + } + + return null; + } + + /** + * Find best mapper node. + * + * @param affIds Affinity node IDs. + * @param top Topology. + * @return Result. + */ + private UUID bestMapperNode(@Nullable Collection<UUID> affIds, HadoopMapReducePlanTopology top) { + // Priority node. + UUID prioAffId = F.first(affIds); + + // Find group with the least weight. + HadoopMapReducePlanGroup resGrp = null; + MapperPriority resPrio = MapperPriority.NORMAL; + int resWeight = Integer.MAX_VALUE; + + for (HadoopMapReducePlanGroup grp : top.groups()) { + MapperPriority prio = groupPriority(grp, affIds, prioAffId); + + int weight = grp.weight() + (prio == MapperPriority.NORMAL ? rmtMapperWeight : locMapperWeight); + + if (resGrp == null || weight < resWeight || weight == resWeight && prio.value() > resPrio.value()) { + resGrp = grp; + resPrio = prio; + resWeight = weight; + } + } + + assert resGrp != null; + + // Update group weight for further runs. + resGrp.weight(resWeight); + + // Return the best node from the group. + return bestMapperNodeForGroup(resGrp, resPrio, affIds, prioAffId); + } + + /** + * Get best node in the group. + * + * @param grp Group. + * @param priority Priority. + * @param affIds Affinity IDs. + * @param prioAffId Priority affinity IDs. + * @return Best node ID in the group. + */ + private static UUID bestMapperNodeForGroup(HadoopMapReducePlanGroup grp, MapperPriority priority, + @Nullable Collection<UUID> affIds, @Nullable UUID prioAffId) { + // Return the best node from the group. + int idx = 0; + + // This is rare situation when several nodes are started on the same host. + if (!grp.single()) { + switch (priority) { + case NORMAL: { + // Pick any node. + idx = ThreadLocalRandom.current().nextInt(grp.nodeCount()); + + break; + } + case HIGH: { + // Pick any affinity node. + assert affIds != null; + + List<Integer> cands = new ArrayList<>(); + + for (int i = 0; i < grp.nodeCount(); i++) { + UUID id = grp.nodeId(i); + + if (affIds.contains(id)) + cands.add(i); + } + + idx = cands.get(ThreadLocalRandom.current().nextInt(cands.size())); + + break; + } + default: { + // Find primary node. + assert prioAffId != null; + + for (int i = 0; i < grp.nodeCount(); i++) { + UUID id = grp.nodeId(i); + + if (F.eq(id, prioAffId)) { + idx = i; + + break; + } + } + + assert priority == MapperPriority.HIGHEST; + } + } + } + + return grp.nodeId(idx); + } + + /** + * Generate reducers. + * + * @param splits Input splits. + * @param top Topology. + * @param mappers Mappers. + * @param reducerCnt Reducer count. + * @return Reducers. + */ + private Map<UUID, int[]> assignReducers(Collection<HadoopInputSplit> splits, HadoopMapReducePlanTopology top, + Mappers mappers, int reducerCnt) { + Map<UUID, Integer> reducers = assignReducers0(top, splits, mappers, reducerCnt); + + int cnt = 0; + + Map<UUID, int[]> res = new HashMap<>(reducers.size()); + + for (Map.Entry<UUID, Integer> reducerEntry : reducers.entrySet()) { + int[] arr = new int[reducerEntry.getValue()]; + + for (int i = 0; i < arr.length; i++) + arr[i] = cnt++; + + res.put(reducerEntry.getKey(), arr); + } + + assert reducerCnt == cnt : reducerCnt + " != " + cnt; + + return res; + } + + /** + * Generate reducers. + * + * @param top Topology. + * @param splits Input splits. + * @param mappers Mappers. + * @param reducerCnt Reducer count. + * @return Reducers. + */ + private Map<UUID, Integer> assignReducers0(HadoopMapReducePlanTopology top, Collection<HadoopInputSplit> splits, + Mappers mappers, int reducerCnt) { + Map<UUID, Integer> res = new HashMap<>(); + + // Assign reducers to splits. + Map<HadoopInputSplit, Integer> splitToReducerCnt = assignReducersToSplits(splits, reducerCnt); + + // Assign as much local reducers as possible. + int remaining = 0; + + for (Map.Entry<HadoopInputSplit, Integer> entry : splitToReducerCnt.entrySet()) { + HadoopInputSplit split = entry.getKey(); + int cnt = entry.getValue(); + + if (cnt > 0) { + int assigned = assignLocalReducers(split, cnt, top, mappers, res); + + assert assigned <= cnt; + + remaining += cnt - assigned; + } + } + + // Assign the rest reducers. + if (remaining > 0) + assignRemoteReducers(remaining, top, mappers, res); + + return res; + } + + /** + * Assign local split reducers. + * + * @param split Split. + * @param cnt Reducer count. + * @param top Topology. + * @param mappers Mappers. + * @param resMap Reducers result map. + * @return Number of locally assigned reducers. + */ + private int assignLocalReducers(HadoopInputSplit split, int cnt, HadoopMapReducePlanTopology top, Mappers mappers, + Map<UUID, Integer> resMap) { + // Dereference node. + UUID nodeId = mappers.splitToNode.get(split); + + assert nodeId != null; + + // Dereference group. + HadoopMapReducePlanGroup grp = top.groupForId(nodeId); + + assert grp != null; + + // Assign more reducers to the node until threshold is reached. + int res = 0; + + while (res < cnt && grp.weight() < preferLocReducerThresholdWeight) { + res++; + + grp.weight(grp.weight() + locReducerWeight); + } + + // Update result map. + if (res > 0) { + Integer reducerCnt = resMap.get(nodeId); + + resMap.put(nodeId, reducerCnt == null ? res : reducerCnt + res); + } + + return res; + } + + /** + * Assign remote reducers. Assign to the least loaded first. + * + * @param cnt Count. + * @param top Topology. + * @param mappers Mappers. + * @param resMap Reducers result map. + */ + private void assignRemoteReducers(int cnt, HadoopMapReducePlanTopology top, Mappers mappers, + Map<UUID, Integer> resMap) { + + TreeSet<HadoopMapReducePlanGroup> set = new TreeSet<>(new GroupWeightComparator()); + + set.addAll(top.groups()); + + while (cnt-- > 0) { + // The least loaded machine. + HadoopMapReducePlanGroup grp = set.first(); + + // Look for nodes with assigned splits. + List<UUID> splitNodeIds = null; + + for (int i = 0; i < grp.nodeCount(); i++) { + UUID nodeId = grp.nodeId(i); + + if (mappers.nodeToSplits.containsKey(nodeId)) { + if (splitNodeIds == null) + splitNodeIds = new ArrayList<>(2); + + splitNodeIds.add(nodeId); + } + } + + // Select best node. + UUID id; + int newWeight; + + if (splitNodeIds != null) { + id = splitNodeIds.get(ThreadLocalRandom.current().nextInt(splitNodeIds.size())); + + newWeight = grp.weight() + locReducerWeight; + } + else { + id = grp.nodeId(ThreadLocalRandom.current().nextInt(grp.nodeCount())); + + newWeight = grp.weight() + rmtReducerWeight; + } + + // Re-add entry with new weight. + boolean rmv = set.remove(grp); + + assert rmv; + + grp.weight(newWeight); + + boolean add = set.add(grp); + + assert add; + + // Update result map. + Integer res = resMap.get(id); + + resMap.put(id, res == null ? 1 : res + 1); + } + } + + /** + * Comparator based on group's weight. + */ + private static class GroupWeightComparator implements Comparator<HadoopMapReducePlanGroup> { + /** {@inheritDoc} */ + @Override public int compare(HadoopMapReducePlanGroup first, HadoopMapReducePlanGroup second) { + int res = first.weight() - second.weight(); + + if (res < 0) + return -1; + else if (res > 0) + return 1; + else + return first.macs().compareTo(second.macs()); + } + } + + /** + * Distribute reducers between splits. + * + * @param splits Splits. + * @param reducerCnt Reducer count. + * @return Map from input split to reducer count. + */ + private Map<HadoopInputSplit, Integer> assignReducersToSplits(Collection<HadoopInputSplit> splits, + int reducerCnt) { + Map<HadoopInputSplit, Integer> res = new IdentityHashMap<>(splits.size()); + + int base = reducerCnt / splits.size(); + int remainder = reducerCnt % splits.size(); + + for (HadoopInputSplit split : splits) { + int val = base; + + if (remainder > 0) { + val++; + + remainder--; + } + + res.put(split, val); + } + + assert remainder == 0; + + return res; + } + + /** + * Calculate group priority. + * + * @param grp Group. + * @param affIds Affinity IDs. + * @param prioAffId Priority affinity ID. + * @return Group priority. + */ + private static MapperPriority groupPriority(HadoopMapReducePlanGroup grp, @Nullable Collection<UUID> affIds, + @Nullable UUID prioAffId) { + assert F.isEmpty(affIds) ? prioAffId == null : prioAffId == F.first(affIds); + assert grp != null; + + MapperPriority prio = MapperPriority.NORMAL; + + if (!F.isEmpty(affIds)) { + for (int i = 0; i < grp.nodeCount(); i++) { + UUID id = grp.nodeId(i); + + if (affIds.contains(id)) { + prio = MapperPriority.HIGH; + + if (F.eq(prioAffId, id)) { + prio = MapperPriority.HIGHEST; + + break; + } + } + } + } + + return prio; + } + + /** + * Get local mapper weight. This weight is added to a node when a mapper is assigned and it's input split data is + * located on this node (at least partially). + * <p> + * Defaults to {@link #DFLT_LOC_MAPPER_WEIGHT}. + * + * @return Remote mapper weight. + */ + public int getLocalMapperWeight() { + return locMapperWeight; + } + + /** + * Set local mapper weight. See {@link #getLocalMapperWeight()} for more information. + * + * @param locMapperWeight Local mapper weight. + */ + public void setLocalMapperWeight(int locMapperWeight) { + this.locMapperWeight = locMapperWeight; + } + + /** + * Get remote mapper weight. This weight is added to a node when a mapper is assigned, but it's input + * split data is not located on this node. + * <p> + * Defaults to {@link #DFLT_RMT_MAPPER_WEIGHT}. + * + * @return Remote mapper weight. + */ + public int getRemoteMapperWeight() { + return rmtMapperWeight; + } + + /** + * Set remote mapper weight. See {@link #getRemoteMapperWeight()} for more information. + * + * @param rmtMapperWeight Remote mapper weight. + */ + public void setRemoteMapperWeight(int rmtMapperWeight) { + this.rmtMapperWeight = rmtMapperWeight; + } + + /** + * Get local reducer weight. This weight is added to a node when a reducer is assigned and the node have at least + * one assigned mapper. + * <p> + * Defaults to {@link #DFLT_LOC_REDUCER_WEIGHT}. + * + * @return Local reducer weight. + */ + public int getLocalReducerWeight() { + return locReducerWeight; + } + + /** + * Set local reducer weight. See {@link #getLocalReducerWeight()} for more information. + * + * @param locReducerWeight Local reducer weight. + */ + public void setLocalReducerWeight(int locReducerWeight) { + this.locReducerWeight = locReducerWeight; + } + + /** + * Get remote reducer weight. This weight is added to a node when a reducer is assigned, but the node doesn't have + * any assigned mappers. + * <p> + * Defaults to {@link #DFLT_RMT_REDUCER_WEIGHT}. + * + * @return Remote reducer weight. + */ + public int getRemoteReducerWeight() { + return rmtReducerWeight; + } + + /** + * Set remote reducer weight. See {@link #getRemoteReducerWeight()} for more information. + * + * @param rmtReducerWeight Remote reducer weight. + */ + public void setRemoteReducerWeight(int rmtReducerWeight) { + this.rmtReducerWeight = rmtReducerWeight; + } + + /** + * Get reducer migration threshold weight. When threshold is reached, a node with mappers is no longer considered + * as preferred for further reducer assignments. + * <p> + * Defaults to {@link #DFLT_PREFER_LOCAL_REDUCER_THRESHOLD_WEIGHT}. + * + * @return Reducer migration threshold weight. + */ + public int getPreferLocalReducerThresholdWeight() { + return preferLocReducerThresholdWeight; + } + + /** + * Set reducer migration threshold weight. See {@link #getPreferLocalReducerThresholdWeight()} for more + * information. + * + * @param reducerMigrationThresholdWeight Reducer migration threshold weight. + */ + public void setPreferLocalReducerThresholdWeight(int reducerMigrationThresholdWeight) { + this.preferLocReducerThresholdWeight = reducerMigrationThresholdWeight; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgniteHadoopWeightedMapReducePlanner.class, this); + } + + /** + * Node ID and length. + */ + private static class NodeIdAndLength implements Comparable<NodeIdAndLength> { + /** Node ID. */ + private final UUID id; + + /** Length. */ + private final long len; + + /** + * Constructor. + * + * @param id Node ID. + * @param len Length. + */ + public NodeIdAndLength(UUID id, long len) { + this.id = id; + this.len = len; + } + + /** {@inheritDoc} */ + @SuppressWarnings("NullableProblems") + @Override public int compareTo(NodeIdAndLength obj) { + long res = len - obj.len; + + if (res > 0) + return -1; + else if (res < 0) + return 1; + else + return id.compareTo(obj.id); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return id.hashCode(); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + return obj instanceof NodeIdAndLength && F.eq(id, ((NodeIdAndLength)obj).id); + } + } + + /** + * Mappers. + */ + private static class Mappers { + /** Node-to-splits map. */ + private final Map<UUID, Collection<HadoopInputSplit>> nodeToSplits = new HashMap<>(); + + /** Split-to-node map. */ + private final Map<HadoopInputSplit, UUID> splitToNode = new IdentityHashMap<>(); + + /** + * Add mapping. + * + * @param split Split. + * @param node Node. + */ + public void add(HadoopInputSplit split, UUID node) { + Collection<HadoopInputSplit> nodeSplits = nodeToSplits.get(node); + + if (nodeSplits == null) { + nodeSplits = new HashSet<>(); + + nodeToSplits.put(node, nodeSplits); + } + + nodeSplits.add(split); + + splitToNode.put(split, node); + } + } + + /** + * Mapper priority enumeration. + */ + private enum MapperPriority { + /** Normal node. */ + NORMAL(0), + + /** (likely) Affinity node. */ + HIGH(1), + + /** (likely) Affinity node with the highest priority (e.g. because it hosts more data than other nodes). */ + HIGHEST(2); + + /** Value. */ + private final int val; + + /** + * Constructor. + * + * @param val Value. + */ + MapperPriority(int val) { + this.val = val; + } + + /** + * @return Value. + */ + public int value() { + return val; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/73649386/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java index 3fa963f..44d871a 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java @@ -25,8 +25,12 @@ import java.io.ObjectInputStream; import java.io.ObjectOutput; import java.io.ObjectOutputStream; import java.io.PrintStream; +import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.TreeSet; import java.util.UUID; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -352,4 +356,81 @@ public class HadoopUtils { } } + /** + * Sort input splits by length. + * + * @param splits Splits. + * @return Sorted splits. + */ + public static List<HadoopInputSplit> sortInputSplits(Collection<HadoopInputSplit> splits) { + int id = 0; + + TreeSet<SplitSortWrapper> sortedSplits = new TreeSet<>(); + + for (HadoopInputSplit split : splits) { + long len = split instanceof HadoopFileBlock ? ((HadoopFileBlock)split).length() : 0; + + sortedSplits.add(new SplitSortWrapper(id++, split, len)); + } + + ArrayList<HadoopInputSplit> res = new ArrayList<>(sortedSplits.size()); + + for (SplitSortWrapper sortedSplit : sortedSplits) + res.add(sortedSplit.split); + + return res; + } + + /** + * Split wrapper for sorting. + */ + private static class SplitSortWrapper implements Comparable<SplitSortWrapper> { + /** Unique ID. */ + private final int id; + + /** Split. */ + private final HadoopInputSplit split; + + /** Split length. */ + private final long len; + + /** + * Constructor. + * + * @param id Unique ID. + * @param split Split. + * @param len Split length. + */ + public SplitSortWrapper(int id, HadoopInputSplit split, long len) { + this.id = id; + this.split = split; + this.len = len; + } + + /** {@inheritDoc} */ + @SuppressWarnings("NullableProblems") + @Override public int compareTo(SplitSortWrapper other) { + assert other != null; + + long res = len - other.len; + + if (res > 0) + return -1; + else if (res < 0) + return 1; + else + return id - other.id; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return id; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + return obj instanceof SplitSortWrapper && id == ((SplitSortWrapper)obj).id; + } + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/73649386/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopAbstractMapReducePlanner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopAbstractMapReducePlanner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopAbstractMapReducePlanner.java new file mode 100644 index 0000000..f01f72b --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopAbstractMapReducePlanner.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.planner; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlanner; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.resources.LoggerResource; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MACS; + +/** + * Base class for map-reduce planners. + */ +public abstract class HadoopAbstractMapReducePlanner implements HadoopMapReducePlanner { + /** Injected grid. */ + @IgniteInstanceResource + protected Ignite ignite; + + /** Logger. */ + @SuppressWarnings("UnusedDeclaration") + @LoggerResource + protected IgniteLogger log; + + /** + * Create plan topology. + * + * @param nodes Topology nodes. + * @return Plan topology. + */ + protected static HadoopMapReducePlanTopology topology(Collection<ClusterNode> nodes) { + Map<String, HadoopMapReducePlanGroup> macsMap = new HashMap<>(nodes.size()); + + Map<UUID, HadoopMapReducePlanGroup> idToGrp = new HashMap<>(nodes.size()); + Map<String, HadoopMapReducePlanGroup> hostToGrp = new HashMap<>(nodes.size()); + + for (ClusterNode node : nodes) { + String macs = node.attribute(ATTR_MACS); + + HadoopMapReducePlanGroup grp = macsMap.get(macs); + + if (grp == null) { + grp = new HadoopMapReducePlanGroup(node, macs); + + macsMap.put(macs, grp); + } + else + grp.add(node); + + idToGrp.put(node.id(), grp); + + for (String host : node.addresses()) { + HadoopMapReducePlanGroup hostGrp = hostToGrp.get(host); + + if (hostGrp == null) + hostToGrp.put(host, grp); + else + assert hostGrp == grp; + } + } + + return new HadoopMapReducePlanTopology(new ArrayList<>(macsMap.values()), idToGrp, hostToGrp); + } + + + /** + * Groups nodes by host names. + * + * @param top Topology to group. + * @return Map. + */ + protected static Map<String, Collection<UUID>> groupByHost(Collection<ClusterNode> top) { + Map<String, Collection<UUID>> grouped = U.newHashMap(top.size()); + + for (ClusterNode node : top) { + for (String host : node.hostNames()) { + Collection<UUID> nodeIds = grouped.get(host); + + if (nodeIds == null) { + // Expecting 1-2 nodes per host. + nodeIds = new ArrayList<>(2); + + grouped.put(host, nodeIds); + } + + nodeIds.add(node.id()); + } + } + + return grouped; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/73649386/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanGroup.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanGroup.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanGroup.java new file mode 100644 index 0000000..2fe8682 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanGroup.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.planner; + +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; + +import java.util.ArrayList; +import java.util.UUID; + +/** + * Map-reduce plan group of nodes on a single physical machine. + */ +public class HadoopMapReducePlanGroup { + /** Node. */ + private ClusterNode node; + + /** Nodes. */ + private ArrayList<ClusterNode> nodes; + + /** MAC addresses. */ + private final String macs; + + /** Weight. */ + private int weight; + + /** + * Constructor. + * + * @param node First node in the group. + * @param macs MAC addresses. + */ + public HadoopMapReducePlanGroup(ClusterNode node, String macs) { + assert node != null; + assert macs != null; + + this.node = node; + this.macs = macs; + } + + /** + * Add node to the group. + * + * @param newNode New node. + */ + public void add(ClusterNode newNode) { + if (node != null) { + nodes = new ArrayList<>(2); + + nodes.add(node); + + node = null; + } + + nodes.add(newNode); + } + + /** + * @return MAC addresses. + */ + public String macs() { + return macs; + } + + /** + * @return {@code True} if only sinle node present. + */ + public boolean single() { + return nodeCount() == 1; + } + + /** + * Get node ID by index. + * + * @param idx Index. + * @return Node. + */ + public UUID nodeId(int idx) { + ClusterNode res; + + if (node != null) { + assert idx == 0; + + res = node; + } + else { + assert nodes != null; + assert idx < nodes.size(); + + res = nodes.get(idx); + } + + assert res != null; + + return res.id(); + } + + /** + * @return Node count. + */ + public int nodeCount() { + return node != null ? 1 : nodes.size(); + } + + /** + * @return weight. + */ + public int weight() { + return weight; + } + + /** + * @param weight weight. + */ + public void weight(int weight) { + this.weight = weight; + } + + + /** {@inheritDoc} */ + @Override public int hashCode() { + return macs.hashCode(); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + return obj instanceof HadoopMapReducePlanGroup && F.eq(macs, ((HadoopMapReducePlanGroup)obj).macs); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopMapReducePlanGroup.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/73649386/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanTopology.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanTopology.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanTopology.java new file mode 100644 index 0000000..fa5c469 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanTopology.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.planner; + +import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.Nullable; + +import java.util.List; +import java.util.Map; +import java.util.UUID; + +/** + * Map-reduce plan topology. + */ +public class HadoopMapReducePlanTopology { + /** All groups. */ + private final List<HadoopMapReducePlanGroup> grps; + + /** Node ID to group map. */ + private final Map<UUID, HadoopMapReducePlanGroup> idToGrp; + + /** Host to group map. */ + private final Map<String, HadoopMapReducePlanGroup> hostToGrp; + + /** + * Constructor. + * + * @param grps All groups. + * @param idToGrp ID to group map. + * @param hostToGrp Host to group map. + */ + public HadoopMapReducePlanTopology(List<HadoopMapReducePlanGroup> grps, + Map<UUID, HadoopMapReducePlanGroup> idToGrp, Map<String, HadoopMapReducePlanGroup> hostToGrp) { + assert grps != null; + assert idToGrp != null; + assert hostToGrp != null; + + this.grps = grps; + this.idToGrp = idToGrp; + this.hostToGrp = hostToGrp; + } + + /** + * @return All groups. + */ + public List<HadoopMapReducePlanGroup> groups() { + return grps; + } + + /** + * Get group for node ID. + * + * @param id Node ID. + * @return Group. + */ + public HadoopMapReducePlanGroup groupForId(UUID id) { + return idToGrp.get(id); + } + + /** + * Get group for host. + * + * @param host Host. + * @return Group. + */ + @Nullable public HadoopMapReducePlanGroup groupForHost(String host) { + return hostToGrp.get(host); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopMapReducePlanTopology.class, this); + } +}