IGNITE-3411: Hadoop: Implemented Hadoop attributes.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9d033e54 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9d033e54 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9d033e54 Branch: refs/heads/ignite-1232 Commit: 9d033e54a69785860759e6994fd3e403b5ed6049 Parents: 58f3d0f Author: vozerov-gridgain <[email protected]> Authored: Fri Jul 1 15:22:39 2016 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Wed Jul 6 09:53:57 2016 +0300 ---------------------------------------------------------------------- .../processors/hadoop/HadoopAttributes.java | 168 +++++++++++++++++++ .../processors/hadoop/HadoopProcessor.java | 49 +++--- 2 files changed, 194 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/9d033e54/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopAttributes.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopAttributes.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopAttributes.java new file mode 100644 index 0000000..23eaa18 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopAttributes.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.HadoopConfiguration; +import org.apache.ignite.internal.IgniteNodeAttributes; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.Nullable; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.Arrays; + +/** + * Hadoop attributes. + */ +public class HadoopAttributes implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Attribute name. */ + public static final String NAME = IgniteNodeAttributes.ATTR_PREFIX + ".hadoop"; + + /** Map-reduce planner class name. */ + private String plannerCls; + + /** External executor flag. */ + private boolean extExec; + + /** Maximum parallel tasks. */ + private int maxParallelTasks; + + /** Maximum task queue size. */ + private int maxTaskQueueSize; + + /** Library names. */ + @GridToStringExclude + private String[] libNames; + + /** Number of cores. */ + private int cores; + + /** + * Get attributes for node (if any). + * + * @param node Node. + * @return Attributes or {@code null} if Hadoop Accelerator is not enabled for node. + */ + @Nullable public static HadoopAttributes forNode(ClusterNode node) { + return node.attribute(NAME); + } + + /** + * {@link Externalizable} support. + */ + public HadoopAttributes() { + // No-op. + } + + /** + * Constructor. + * + * @param cfg Configuration. + */ + public HadoopAttributes(HadoopConfiguration cfg) { + assert cfg != null; + assert cfg.getMapReducePlanner() != null; + + plannerCls = cfg.getMapReducePlanner().getClass().getName(); + + // TODO: IGNITE-404: Get from configuration when fixed. + extExec = false; + + maxParallelTasks = cfg.getMaxParallelTasks(); + maxTaskQueueSize = cfg.getMaxTaskQueueSize(); + libNames = cfg.getNativeLibraryNames(); + + // Cores count already passed in other attributes, we add it here for convenience. + cores = Runtime.getRuntime().availableProcessors(); + } + + /** + * @return Map reduce planner class name. + */ + public String plannerClassName() { + return plannerCls; + } + + /** + * @return External execution flag. + */ + public boolean externalExecution() { + return extExec; + } + + /** + * @return Maximum parallel tasks. + */ + public int maxParallelTasks() { + return maxParallelTasks; + } + + /** + * @return Maximum task queue size. + */ + public int maxTaskQueueSize() { + return maxTaskQueueSize; + } + + + /** + * @return Native library names. + */ + public String[] nativeLibraryNames() { + return libNames; + } + + /** + * @return Number of cores on machine. + */ + public int cores() { + return cores; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(plannerCls); + out.writeBoolean(extExec); + out.writeInt(maxParallelTasks); + out.writeInt(maxTaskQueueSize); + out.writeObject(libNames); + out.writeInt(cores); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + plannerCls = (String)in.readObject(); + extExec = in.readBoolean(); + maxParallelTasks = in.readInt(); + maxTaskQueueSize = in.readInt(); + libNames = (String[])in.readObject(); + cores = in.readInt(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopAttributes.class, this, "libNames", Arrays.toString(libNames)); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9d033e54/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java index 7fc7499..bb10565 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java @@ -17,10 +17,6 @@ package org.apache.ignite.internal.processors.hadoop; -import java.io.IOException; -import java.util.List; -import java.util.ListIterator; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.configuration.HadoopConfiguration; import org.apache.ignite.hadoop.mapreduce.IgniteHadoopMapReducePlanner; @@ -34,6 +30,11 @@ import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import java.io.IOException; +import java.util.List; +import java.util.ListIterator; +import java.util.concurrent.atomic.AtomicInteger; + /** * Hadoop processor. */ @@ -85,16 +86,24 @@ public class HadoopProcessor extends HadoopProcessorAdapter { c.start(hctx); hadoop = new HadoopImpl(this); + + ctx.addNodeAttribute(HadoopAttributes.NAME, new HadoopAttributes(cfg)); } /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(HadoopProcessor.class, this); + @Override public void onKernalStart() throws IgniteCheckedException { + super.onKernalStart(); + + if (hctx == null) + return; + + for (HadoopComponent c : hctx.components()) + c.onKernalStart(); } /** {@inheritDoc} */ - @Override public void stop(boolean cancel) throws IgniteCheckedException { - super.stop(cancel); + @Override public void onKernalStop(boolean cancel) { + super.onKernalStop(cancel); if (hctx == null) return; @@ -104,24 +113,13 @@ public class HadoopProcessor extends HadoopProcessorAdapter { for (ListIterator<HadoopComponent> it = components.listIterator(components.size()); it.hasPrevious();) { HadoopComponent c = it.previous(); - c.stop(cancel); + c.onKernalStop(cancel); } } /** {@inheritDoc} */ - @Override public void onKernalStart() throws IgniteCheckedException { - super.onKernalStart(); - - if (hctx == null) - return; - - for (HadoopComponent c : hctx.components()) - c.onKernalStart(); - } - - /** {@inheritDoc} */ - @Override public void onKernalStop(boolean cancel) { - super.onKernalStop(cancel); + @Override public void stop(boolean cancel) throws IgniteCheckedException { + super.stop(cancel); if (hctx == null) return; @@ -131,7 +129,7 @@ public class HadoopProcessor extends HadoopProcessorAdapter { for (ListIterator<HadoopComponent> it = components.listIterator(components.size()); it.hasPrevious();) { HadoopComponent c = it.previous(); - c.onKernalStop(cancel); + c.stop(cancel); } } @@ -217,4 +215,9 @@ public class HadoopProcessor extends HadoopProcessorAdapter { if (cfg.getMapReducePlanner() == null) cfg.setMapReducePlanner(new IgniteHadoopMapReducePlanner()); } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopProcessor.class, this); + } } \ No newline at end of file
