http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1OutputCollector.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1OutputCollector.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1OutputCollector.java new file mode 100644 index 0000000..37f81a6 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1OutputCollector.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v1; + +import java.io.IOException; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.OutputCommitter; +import org.apache.hadoop.mapred.OutputFormat; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.TaskAttemptContext; +import org.apache.hadoop.mapred.TaskAttemptContextImpl; +import org.apache.hadoop.mapred.TaskAttemptID; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; +import org.jetbrains.annotations.Nullable; + +/** + * Hadoop output collector. + */ +public class HadoopV1OutputCollector implements OutputCollector { + /** Job configuration. */ + private final JobConf jobConf; + + /** Task context. */ + private final HadoopTaskContext taskCtx; + + /** Optional direct writer. */ + private final RecordWriter writer; + + /** Task attempt. */ + private final TaskAttemptID attempt; + + /** + * @param jobConf Job configuration. + * @param taskCtx Task context. + * @param directWrite Direct write flag. + * @param fileName File name. + * @throws IOException In case of IO exception. + */ + HadoopV1OutputCollector(JobConf jobConf, HadoopTaskContext taskCtx, boolean directWrite, + @Nullable String fileName, TaskAttemptID attempt) throws IOException { + this.jobConf = jobConf; + this.taskCtx = taskCtx; + this.attempt = attempt; + + if (directWrite) { + jobConf.set("mapreduce.task.attempt.id", attempt.toString()); + + OutputFormat outFormat = jobConf.getOutputFormat(); + + writer = outFormat.getRecordWriter(null, jobConf, fileName, Reporter.NULL); + } + else + writer = null; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void collect(Object key, Object val) throws IOException { + if (writer != null) + writer.write(key, val); + else { + try { + taskCtx.output().write(key, val); + } + catch (IgniteCheckedException e) { + throw new IOException(e); + } + } + } + + /** + * Close writer. + * + * @throws IOException In case of IO exception. + */ + public void closeWriter() throws IOException { + if (writer != null) + writer.close(Reporter.NULL); + } + + /** + * Setup task. + * + * @throws IOException If failed. + */ + public void setup() throws IOException { + if (writer != null) + jobConf.getOutputCommitter().setupTask(new TaskAttemptContextImpl(jobConf, attempt)); + } + + /** + * Commit task. + * + * @throws IOException In failed. + */ + public void commit() throws IOException { + if (writer != null) { + OutputCommitter outputCommitter = jobConf.getOutputCommitter(); + + TaskAttemptContext taskCtx = new TaskAttemptContextImpl(jobConf, attempt); + + if (outputCommitter.needsTaskCommit(taskCtx)) + outputCommitter.commitTask(taskCtx); + } + } + + /** + * Abort task. + */ + public void abort() { + try { + if (writer != null) + jobConf.getOutputCommitter().abortTask(new TaskAttemptContextImpl(jobConf, attempt)); + } + catch (IOException ignore) { + // No-op. + } + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Partitioner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Partitioner.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Partitioner.java new file mode 100644 index 0000000..0ab1bba --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Partitioner.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v1; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.Partitioner; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.ignite.internal.processors.hadoop.HadoopPartitioner; + +/** + * Hadoop partitioner adapter for v1 API. + */ +public class HadoopV1Partitioner implements HadoopPartitioner { + /** Partitioner instance. */ + private Partitioner<Object, Object> part; + + /** + * @param cls Hadoop partitioner class. + * @param conf Job configuration. + */ + public HadoopV1Partitioner(Class<? extends Partitioner> cls, Configuration conf) { + part = (Partitioner<Object, Object>) ReflectionUtils.newInstance(cls, conf); + } + + /** {@inheritDoc} */ + @Override public int partition(Object key, Object val, int parts) { + return part.getPartition(key, val, parts); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1ReduceTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1ReduceTask.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1ReduceTask.java new file mode 100644 index 0000000..e656695 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1ReduceTask.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v1; + +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.hadoop.HadoopJob; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput; +import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2TaskContext; + +/** + * Hadoop reduce task implementation for v1 API. + */ +public class HadoopV1ReduceTask extends HadoopV1Task { + /** {@code True} if reduce, {@code false} if combine. */ + private final boolean reduce; + + /** + * Constructor. + * + * @param taskInfo Task info. + * @param reduce {@code True} if reduce, {@code false} if combine. + */ + public HadoopV1ReduceTask(HadoopTaskInfo taskInfo, boolean reduce) { + super(taskInfo); + + this.reduce = reduce; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException { + HadoopJob job = taskCtx.job(); + + HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx; + + JobConf jobConf = ctx.jobConf(); + + HadoopTaskInput input = taskCtx.input(); + + HadoopV1OutputCollector collector = null; + + try { + collector = collector(jobConf, ctx, reduce || !job.info().hasReducer(), fileName(), ctx.attemptId()); + + Reducer reducer; + if (reduce) reducer = ReflectionUtils.newInstance(jobConf.getReducerClass(), + jobConf); + else reducer = ReflectionUtils.newInstance(jobConf.getCombinerClass(), + jobConf); + + assert reducer != null; + + try { + try { + while (input.next()) { + if (isCancelled()) + throw new HadoopTaskCancelledException("Reduce task cancelled."); + + reducer.reduce(input.key(), input.values(), collector, Reporter.NULL); + } + } + finally { + reducer.close(); + } + } + finally { + collector.closeWriter(); + } + + collector.commit(); + } + catch (Exception e) { + if (collector != null) + collector.abort(); + + throw new IgniteCheckedException(e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Reporter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Reporter.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Reporter.java new file mode 100644 index 0000000..5a63aab --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Reporter.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v1; + +import org.apache.hadoop.mapred.Counters; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.Reporter; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; +import org.apache.ignite.internal.processors.hadoop.counter.HadoopLongCounter; + +/** + * Hadoop reporter implementation for v1 API. + */ +public class HadoopV1Reporter implements Reporter { + /** Context. */ + private final HadoopTaskContext ctx; + + /** + * Creates new instance. + * + * @param ctx Context. + */ + public HadoopV1Reporter(HadoopTaskContext ctx) { + this.ctx = ctx; + } + + /** {@inheritDoc} */ + @Override public void setStatus(String status) { + // TODO + } + + /** {@inheritDoc} */ + @Override public Counters.Counter getCounter(Enum<?> name) { + return getCounter(name.getDeclaringClass().getName(), name.name()); + } + + /** {@inheritDoc} */ + @Override public Counters.Counter getCounter(String grp, String name) { + return new HadoopV1Counter(ctx.counter(grp, name, HadoopLongCounter.class)); + } + + /** {@inheritDoc} */ + @Override public void incrCounter(Enum<?> key, long amount) { + getCounter(key).increment(amount); + } + + /** {@inheritDoc} */ + @Override public void incrCounter(String grp, String cntr, long amount) { + getCounter(grp, cntr).increment(amount); + } + + /** {@inheritDoc} */ + @Override public InputSplit getInputSplit() throws UnsupportedOperationException { + throw new UnsupportedOperationException("reporter has no input"); // TODO + } + + /** {@inheritDoc} */ + @Override public float getProgress() { + return 0.5f; // TODO + } + + /** {@inheritDoc} */ + @Override public void progress() { + // TODO + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1SetupTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1SetupTask.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1SetupTask.java new file mode 100644 index 0000000..d2f6823 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1SetupTask.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v1; + +import java.io.IOException; +import org.apache.hadoop.mapred.OutputCommitter; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; +import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2TaskContext; + +/** + * Hadoop setup task implementation for v1 API. + */ +public class HadoopV1SetupTask extends HadoopV1Task { + /** + * Constructor. + * + * @param taskInfo Task info. + */ + public HadoopV1SetupTask(HadoopTaskInfo taskInfo) { + super(taskInfo); + } + + /** {@inheritDoc} */ + @Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException { + HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx; + + try { + ctx.jobConf().getOutputFormat().checkOutputSpecs(null, ctx.jobConf()); + + OutputCommitter committer = ctx.jobConf().getOutputCommitter(); + + if (committer != null) + committer.setupJob(ctx.jobContext()); + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Splitter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Splitter.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Splitter.java new file mode 100644 index 0000000..203def4 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Splitter.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v1; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock; +import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit; +import org.apache.ignite.internal.processors.hadoop.HadoopUtils; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; + +/** + * Hadoop API v1 splitter. + */ +public class HadoopV1Splitter { + /** */ + private static final String[] EMPTY_HOSTS = {}; + + /** + * @param jobConf Job configuration. + * @return Collection of mapped splits. + * @throws IgniteCheckedException If mapping failed. + */ + public static Collection<HadoopInputSplit> splitJob(JobConf jobConf) throws IgniteCheckedException { + try { + InputFormat<?, ?> format = jobConf.getInputFormat(); + + assert format != null; + + InputSplit[] splits = format.getSplits(jobConf, 0); + + Collection<HadoopInputSplit> res = new ArrayList<>(splits.length); + + for (int i = 0; i < splits.length; i++) { + InputSplit nativeSplit = splits[i]; + + if (nativeSplit instanceof FileSplit) { + FileSplit s = (FileSplit)nativeSplit; + + res.add(new HadoopFileBlock(s.getLocations(), s.getPath().toUri(), s.getStart(), s.getLength())); + } + else + res.add(HadoopUtils.wrapSplit(i, nativeSplit, nativeSplit.getLocations())); + } + + return res; + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + } + + /** + * @param clsName Input split class name. + * @param in Input stream. + * @param hosts Optional hosts. + * @return File block or {@code null} if it is not a {@link FileSplit} instance. + * @throws IgniteCheckedException If failed. + */ + @Nullable public static HadoopFileBlock readFileBlock(String clsName, FSDataInputStream in, + @Nullable String[] hosts) throws IgniteCheckedException { + if (!FileSplit.class.getName().equals(clsName)) + return null; + + FileSplit split = U.newInstance(FileSplit.class); + + try { + split.readFields(in); + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + + if (hosts == null) + hosts = EMPTY_HOSTS; + + return new HadoopFileBlock(hosts, split.getPath().toUri(), split.getStart(), split.getLength()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Task.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Task.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Task.java new file mode 100644 index 0000000..a89323c --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Task.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v1; + +import java.io.IOException; +import java.text.NumberFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.TaskAttemptID; +import org.apache.ignite.internal.processors.hadoop.HadoopTask; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; +import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2TaskContext; +import org.jetbrains.annotations.Nullable; + +/** + * Extended Hadoop v1 task. + */ +public abstract class HadoopV1Task extends HadoopTask { + /** Indicates that this task is to be cancelled. */ + private volatile boolean cancelled; + + /** + * Constructor. + * + * @param taskInfo Task info. + */ + protected HadoopV1Task(HadoopTaskInfo taskInfo) { + super(taskInfo); + } + + /** + * Gets file name for that task result. + * + * @return File name. + */ + public String fileName() { + NumberFormat numFormat = NumberFormat.getInstance(); + + numFormat.setMinimumIntegerDigits(5); + numFormat.setGroupingUsed(false); + + return "part-" + numFormat.format(info().taskNumber()); + } + + /** + * + * @param jobConf Job configuration. + * @param taskCtx Task context. + * @param directWrite Direct write flag. + * @param fileName File name. + * @param attempt Attempt of task. + * @return Collector. + * @throws IOException In case of IO exception. + */ + protected HadoopV1OutputCollector collector(JobConf jobConf, HadoopV2TaskContext taskCtx, + boolean directWrite, @Nullable String fileName, TaskAttemptID attempt) throws IOException { + HadoopV1OutputCollector collector = new HadoopV1OutputCollector(jobConf, taskCtx, directWrite, + fileName, attempt) { + /** {@inheritDoc} */ + @Override public void collect(Object key, Object val) throws IOException { + if (cancelled) + throw new HadoopTaskCancelledException("Task cancelled."); + + super.collect(key, val); + } + }; + + collector.setup(); + + return collector; + } + + /** {@inheritDoc} */ + @Override public void cancel() { + cancelled = true; + } + + /** Returns true if task is cancelled. */ + public boolean isCancelled() { + return cancelled; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopDaemon.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopDaemon.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopDaemon.java new file mode 100644 index 0000000..9632525 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopDaemon.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v2; + +import java.util.Collection; +import java.util.LinkedList; + +/** + * Replacement for Hadoop {@code org.apache.hadoop.util.Daemon} class. + */ +@SuppressWarnings("UnusedDeclaration") +public class HadoopDaemon extends Thread { + /** Lock object used for synchronization. */ + private static final Object lock = new Object(); + + /** Collection to hold the threads to be stopped. */ + private static Collection<HadoopDaemon> daemons = new LinkedList<>(); + + { + setDaemon(true); // always a daemon + } + + /** Runnable of this thread, may be this. */ + final Runnable runnable; + + /** + * Construct a daemon thread. + */ + public HadoopDaemon() { + super(); + + runnable = this; + + enqueueIfNeeded(); + } + + /** + * Construct a daemon thread. + */ + public HadoopDaemon(Runnable runnable) { + super(runnable); + + this.runnable = runnable; + + this.setName(runnable.toString()); + + enqueueIfNeeded(); + } + + /** + * Construct a daemon thread to be part of a specified thread group. + */ + public HadoopDaemon(ThreadGroup grp, Runnable runnable) { + super(grp, runnable); + + this.runnable = runnable; + + this.setName(runnable.toString()); + + enqueueIfNeeded(); + } + + /** + * Getter for the runnable. May return this. + * + * @return the runnable + */ + public Runnable getRunnable() { + return runnable; + } + + /** + * if the runnable is a Hadoop org.apache.hadoop.hdfs.PeerCache Runnable. + * + * @param r the runnable. + * @return true if it is. + */ + private static boolean isPeerCacheRunnable(Runnable r) { + String name = r.getClass().getName(); + + return name.startsWith("org.apache.hadoop.hdfs.PeerCache"); + } + + /** + * Enqueue this thread if it should be stopped upon the task end. + */ + private void enqueueIfNeeded() { + synchronized (lock) { + if (daemons == null) + throw new RuntimeException("Failed to create HadoopDaemon (its registry is already cleared): " + + "[classLoader=" + getClass().getClassLoader() + ']'); + + if (runnable.getClass().getClassLoader() == getClass().getClassLoader() && isPeerCacheRunnable(runnable)) + daemons.add(this); + } + } + + /** + * Stops all the registered threads. + */ + public static void dequeueAndStopAll() { + synchronized (lock) { + if (daemons != null) { + for (HadoopDaemon daemon : daemons) + daemon.interrupt(); + + daemons = null; + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopExternalSplit.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopExternalSplit.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopExternalSplit.java new file mode 100644 index 0000000..c7e8a0a --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopExternalSplit.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v2; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit; + +/** + * Split serialized in external file. + */ +public class HadoopExternalSplit extends HadoopInputSplit { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private long off; + + /** + * For {@link Externalizable}. + */ + public HadoopExternalSplit() { + // No-op. + } + + /** + * @param hosts Hosts. + * @param off Offset of this split in external file. + */ + public HadoopExternalSplit(String[] hosts, long off) { + assert off >= 0 : off; + assert hosts != null; + + this.hosts = hosts; + this.off = off; + } + + /** + * @return Offset of this input split in external file. + */ + public long offset() { + return off; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeLong(off); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + off = in.readLong(); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + HadoopExternalSplit that = (HadoopExternalSplit) o; + + return off == that.off; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return (int)(off ^ (off >>> 32)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopSerializationWrapper.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopSerializationWrapper.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopSerializationWrapper.java new file mode 100644 index 0000000..844e7f8 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopSerializationWrapper.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v2; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import org.apache.hadoop.io.serializer.Deserializer; +import org.apache.hadoop.io.serializer.Serialization; +import org.apache.hadoop.io.serializer.Serializer; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.hadoop.HadoopSerialization; +import org.jetbrains.annotations.Nullable; + +/** + * The wrapper around external serializer. + */ +public class HadoopSerializationWrapper<T> implements HadoopSerialization { + /** External serializer - writer. */ + private final Serializer<T> serializer; + + /** External serializer - reader. */ + private final Deserializer<T> deserializer; + + /** Data output for current write operation. */ + private OutputStream currOut; + + /** Data input for current read operation. */ + private InputStream currIn; + + /** Wrapper around current output to provide OutputStream interface. */ + private final OutputStream outStream = new OutputStream() { + /** {@inheritDoc} */ + @Override public void write(int b) throws IOException { + currOut.write(b); + } + + /** {@inheritDoc} */ + @Override public void write(byte[] b, int off, int len) throws IOException { + currOut.write(b, off, len); + } + }; + + /** Wrapper around current input to provide InputStream interface. */ + private final InputStream inStream = new InputStream() { + /** {@inheritDoc} */ + @Override public int read() throws IOException { + return currIn.read(); + } + + /** {@inheritDoc} */ + @Override public int read(byte[] b, int off, int len) throws IOException { + return currIn.read(b, off, len); + } + }; + + /** + * @param serialization External serializer to wrap. + * @param cls The class to serialize. + */ + public HadoopSerializationWrapper(Serialization<T> serialization, Class<T> cls) throws IgniteCheckedException { + assert cls != null; + + serializer = serialization.getSerializer(cls); + deserializer = serialization.getDeserializer(cls); + + try { + serializer.open(outStream); + deserializer.open(inStream); + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + } + + /** {@inheritDoc} */ + @Override public void write(DataOutput out, Object obj) throws IgniteCheckedException { + assert out != null; + assert obj != null; + + try { + currOut = (OutputStream)out; + + serializer.serialize((T)obj); + + currOut = null; + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + } + + /** {@inheritDoc} */ + @Override public Object read(DataInput in, @Nullable Object obj) throws IgniteCheckedException { + assert in != null; + + try { + currIn = (InputStream)in; + + T res = deserializer.deserialize((T) obj); + + currIn = null; + + return res; + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + } + + /** {@inheritDoc} */ + @Override public void close() throws IgniteCheckedException { + try { + serializer.close(); + deserializer.close(); + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopShutdownHookManager.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopShutdownHookManager.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopShutdownHookManager.java new file mode 100644 index 0000000..8bd71e0 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopShutdownHookManager.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v2; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Fake manager for shutdown hooks. + */ +public class HadoopShutdownHookManager { + /** */ + private static final HadoopShutdownHookManager MGR = new HadoopShutdownHookManager(); + + /** + * Return <code>ShutdownHookManager</code> singleton. + * + * @return <code>ShutdownHookManager</code> singleton. + */ + public static HadoopShutdownHookManager get() { + return MGR; + } + + /** */ + private Set<Runnable> hooks = Collections.synchronizedSet(new HashSet<Runnable>()); + + /** */ + private AtomicBoolean shutdownInProgress = new AtomicBoolean(false); + + /** + * Singleton. + */ + private HadoopShutdownHookManager() { + // No-op. + } + + /** + * Adds a shutdownHook with a priority, the higher the priority + * the earlier will run. ShutdownHooks with same priority run + * in a non-deterministic order. + * + * @param shutdownHook shutdownHook <code>Runnable</code> + * @param priority priority of the shutdownHook. + */ + public void addShutdownHook(Runnable shutdownHook, int priority) { + if (shutdownHook == null) + throw new IllegalArgumentException("shutdownHook cannot be NULL"); + + hooks.add(shutdownHook); + } + + /** + * Removes a shutdownHook. + * + * @param shutdownHook shutdownHook to remove. + * @return TRUE if the shutdownHook was registered and removed, + * FALSE otherwise. + */ + public boolean removeShutdownHook(Runnable shutdownHook) { + return hooks.remove(shutdownHook); + } + + /** + * Indicates if a shutdownHook is registered or not. + * + * @param shutdownHook shutdownHook to check if registered. + * @return TRUE/FALSE depending if the shutdownHook is is registered. + */ + public boolean hasShutdownHook(Runnable shutdownHook) { + return hooks.contains(shutdownHook); + } + + /** + * Indicates if shutdown is in progress or not. + * + * @return TRUE if the shutdown is in progress, otherwise FALSE. + */ + public boolean isShutdownInProgress() { + return shutdownInProgress.get(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopSplitWrapper.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopSplitWrapper.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopSplitWrapper.java new file mode 100644 index 0000000..df77adb --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopSplitWrapper.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v2; + +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** + * The wrapper for native hadoop input splits. + * + * Warning!! This class must not depend on any Hadoop classes directly or indirectly. + */ +public class HadoopSplitWrapper extends HadoopInputSplit { + /** */ + private static final long serialVersionUID = 0L; + + /** Native hadoop input split. */ + private byte[] bytes; + + /** */ + private String clsName; + + /** Internal ID */ + private int id; + + /** + * Creates new split wrapper. + */ + public HadoopSplitWrapper() { + // No-op. + } + + /** + * Creates new split wrapper. + * + * @param id Split ID. + * @param clsName Class name. + * @param bytes Serialized class. + * @param hosts Hosts where split is located. + */ + public HadoopSplitWrapper(int id, String clsName, byte[] bytes, String[] hosts) { + assert hosts != null; + assert clsName != null; + assert bytes != null; + + this.hosts = hosts; + this.id = id; + + this.clsName = clsName; + this.bytes = bytes; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeInt(id); + + out.writeUTF(clsName); + U.writeByteArray(out, bytes); + } + + /** + * @return Class name. + */ + public String className() { + return clsName; + } + + /** + * @return Class bytes. + */ + public byte[] bytes() { + return bytes; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + id = in.readInt(); + + clsName = in.readUTF(); + bytes = U.readByteArray(in); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + HadoopSplitWrapper that = (HadoopSplitWrapper)o; + + return id == that.id; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return id; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2CleanupTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2CleanupTask.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2CleanupTask.java new file mode 100644 index 0000000..abb904c --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2CleanupTask.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v2; + +import java.io.IOException; +import org.apache.hadoop.mapred.JobContextImpl; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; + +/** + * Hadoop cleanup task (commits or aborts job). + */ +public class HadoopV2CleanupTask extends HadoopV2Task { + /** Abort flag. */ + private final boolean abort; + + /** + * @param taskInfo Task info. + * @param abort Abort flag. + */ + public HadoopV2CleanupTask(HadoopTaskInfo taskInfo, boolean abort) { + super(taskInfo); + + this.abort = abort; + } + + /** {@inheritDoc} */ + @SuppressWarnings("ConstantConditions") + @Override public void run0(HadoopV2TaskContext taskCtx) throws IgniteCheckedException { + JobContextImpl jobCtx = taskCtx.jobContext(); + + try { + OutputFormat outputFormat = getOutputFormat(jobCtx); + + OutputCommitter committer = outputFormat.getOutputCommitter(hadoopContext()); + + if (committer != null) { + if (abort) + committer.abortJob(jobCtx, JobStatus.State.FAILED); + else + committer.commitJob(jobCtx); + } + } + catch (ClassNotFoundException | IOException e) { + throw new IgniteCheckedException(e); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + throw new IgniteInterruptedCheckedException(e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Context.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Context.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Context.java new file mode 100644 index 0000000..2ff2945 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Context.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v2; + +import java.io.IOException; +import java.util.Iterator; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.MapContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.ReduceContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.mapreduce.task.JobContextImpl; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock; +import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskOutput; +import org.apache.ignite.internal.processors.hadoop.counter.HadoopLongCounter; + +/** + * Hadoop context implementation for v2 API. It provides IO operations for hadoop tasks. + */ +public class HadoopV2Context extends JobContextImpl implements MapContext, ReduceContext { + /** Input reader to overriding of HadoopTaskContext input. */ + private RecordReader reader; + + /** Output writer to overriding of HadoopTaskContext output. */ + private RecordWriter writer; + + /** Output is provided by executor environment. */ + private final HadoopTaskOutput output; + + /** Input is provided by executor environment. */ + private final HadoopTaskInput input; + + /** Unique identifier for a task attempt. */ + private final TaskAttemptID taskAttemptID; + + /** Indicates that this task is to be cancelled. */ + private volatile boolean cancelled; + + /** Input split. */ + private InputSplit inputSplit; + + /** */ + private final HadoopTaskContext ctx; + + /** */ + private String status; + + /** + * @param ctx Context for IO operations. + */ + public HadoopV2Context(HadoopV2TaskContext ctx) { + super(ctx.jobConf(), ctx.jobContext().getJobID()); + + taskAttemptID = ctx.attemptId(); + + conf.set("mapreduce.job.id", taskAttemptID.getJobID().toString()); + conf.set("mapreduce.task.id", taskAttemptID.getTaskID().toString()); + + output = ctx.output(); + input = ctx.input(); + + this.ctx = ctx; + } + + /** {@inheritDoc} */ + @Override public InputSplit getInputSplit() { + if (inputSplit == null) { + HadoopInputSplit split = ctx.taskInfo().inputSplit(); + + if (split == null) + return null; + + if (split instanceof HadoopFileBlock) { + HadoopFileBlock fileBlock = (HadoopFileBlock)split; + + inputSplit = new FileSplit(new Path(fileBlock.file()), fileBlock.start(), fileBlock.length(), null); + } + else + { + try { + inputSplit = (InputSplit) ((HadoopV2TaskContext)ctx).getNativeSplit(split); + } catch (IgniteCheckedException e) { + throw new IllegalStateException(e); + } + } + } + + return inputSplit; + } + + /** {@inheritDoc} */ + @Override public boolean nextKeyValue() throws IOException, InterruptedException { + if (cancelled) + throw new HadoopTaskCancelledException("Task cancelled."); + + return reader.nextKeyValue(); + } + + /** {@inheritDoc} */ + @Override public Object getCurrentKey() throws IOException, InterruptedException { + if (reader != null) + return reader.getCurrentKey(); + + return input.key(); + } + + /** {@inheritDoc} */ + @Override public Object getCurrentValue() throws IOException, InterruptedException { + return reader.getCurrentValue(); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void write(Object key, Object val) throws IOException, InterruptedException { + if (cancelled) + throw new HadoopTaskCancelledException("Task cancelled."); + + if (writer != null) + writer.write(key, val); + else { + try { + output.write(key, val); + } + catch (IgniteCheckedException e) { + throw new IOException(e); + } + } + } + + /** {@inheritDoc} */ + @Override public OutputCommitter getOutputCommitter() { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public TaskAttemptID getTaskAttemptID() { + return taskAttemptID; + } + + /** {@inheritDoc} */ + @Override public void setStatus(String msg) { + status = msg; + } + + /** {@inheritDoc} */ + @Override public String getStatus() { + return status; + } + + /** {@inheritDoc} */ + @Override public float getProgress() { + return 0.5f; // TODO + } + + /** {@inheritDoc} */ + @Override public Counter getCounter(Enum<?> cntrName) { + return getCounter(cntrName.getDeclaringClass().getName(), cntrName.name()); + } + + /** {@inheritDoc} */ + @Override public Counter getCounter(String grpName, String cntrName) { + return new HadoopV2Counter(ctx.counter(grpName, cntrName, HadoopLongCounter.class)); + } + + /** {@inheritDoc} */ + @Override public void progress() { + // No-op. + } + + /** + * Overrides default input data reader. + * + * @param reader New reader. + */ + public void reader(RecordReader reader) { + this.reader = reader; + } + + /** {@inheritDoc} */ + @Override public boolean nextKey() throws IOException, InterruptedException { + if (cancelled) + throw new HadoopTaskCancelledException("Task cancelled."); + + return input.next(); + } + + /** {@inheritDoc} */ + @Override public Iterable getValues() throws IOException, InterruptedException { + return new Iterable() { + @Override public Iterator iterator() { + return input.values(); + } + }; + } + + /** + * @return Overridden output data writer. + */ + public RecordWriter writer() { + return writer; + } + + /** + * Overrides default output data writer. + * + * @param writer New writer. + */ + public void writer(RecordWriter writer) { + this.writer = writer; + } + + /** + * Cancels the task by stop the IO. + */ + public void cancel() { + cancelled = true; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Counter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Counter.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Counter.java new file mode 100644 index 0000000..cad9e64 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Counter.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v2; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.ignite.internal.processors.hadoop.counter.HadoopLongCounter; + +/** + * Adapter from own counter implementation into Hadoop API Counter od version 2.0. + */ +public class HadoopV2Counter implements Counter { + /** Delegate. */ + private final HadoopLongCounter cntr; + + /** + * Creates new instance with given delegate. + * + * @param cntr Internal counter. + */ + public HadoopV2Counter(HadoopLongCounter cntr) { + assert cntr != null : "counter must be non-null"; + + this.cntr = cntr; + } + + /** {@inheritDoc} */ + @Override public void setDisplayName(String displayName) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String getName() { + return cntr.name(); + } + + /** {@inheritDoc} */ + @Override public String getDisplayName() { + return getName(); + } + + /** {@inheritDoc} */ + @Override public long getValue() { + return cntr.value(); + } + + /** {@inheritDoc} */ + @Override public void setValue(long val) { + cntr.value(val); + } + + /** {@inheritDoc} */ + @Override public void increment(long incr) { + cntr.increment(incr); + } + + /** {@inheritDoc} */ + @Override public Counter getUnderlyingCounter() { + return this; + } + + /** {@inheritDoc} */ + @Override public void write(DataOutput out) throws IOException { + throw new UnsupportedOperationException("not implemented"); + } + + /** {@inheritDoc} */ + @Override public void readFields(DataInput in) throws IOException { + throw new UnsupportedOperationException("not implemented"); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java new file mode 100644 index 0000000..595474c --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java @@ -0,0 +1,445 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v2; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.Method; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; +import java.util.Queue; +import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobContextImpl; +import org.apache.hadoop.mapred.JobID; +import org.apache.hadoop.mapreduce.JobSubmissionFiles; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.split.JobSplit; +import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.processors.hadoop.HadoopClassLoader; +import org.apache.ignite.internal.processors.hadoop.HadoopDefaultJobInfo; +import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock; +import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit; +import org.apache.ignite.internal.processors.hadoop.HadoopJob; +import org.apache.ignite.internal.processors.hadoop.HadoopJobId; +import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskType; +import org.apache.ignite.internal.processors.hadoop.HadoopUtils; +import org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemsUtils; +import org.apache.ignite.internal.processors.hadoop.fs.HadoopLazyConcurrentMap; +import org.apache.ignite.internal.processors.hadoop.v1.HadoopV1Splitter; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; +import org.jsr166.ConcurrentHashMap8; + +import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.jobLocalDir; +import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.taskLocalDir; +import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.transformException; +import static org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemCacheUtils.FsCacheKey; +import static org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemCacheUtils.createHadoopLazyConcurrentMap; +import static org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemCacheUtils.fileSystemForMrUserWithCaching; + +/** + * Hadoop job implementation for v2 API. + */ +public class HadoopV2Job implements HadoopJob { + /** */ + private final JobConf jobConf; + + /** */ + private final JobContextImpl jobCtx; + + /** Hadoop job ID. */ + private final HadoopJobId jobId; + + /** Job info. */ + protected final HadoopJobInfo jobInfo; + + /** Native library names. */ + private final String[] libNames; + + /** */ + private final JobID hadoopJobID; + + /** */ + private final HadoopV2JobResourceManager rsrcMgr; + + /** */ + private final ConcurrentMap<T2<HadoopTaskType, Integer>, GridFutureAdapter<HadoopTaskContext>> ctxs = + new ConcurrentHashMap8<>(); + + /** Pooling task context class and thus class loading environment. */ + private final Queue<Class<? extends HadoopTaskContext>> taskCtxClsPool = new ConcurrentLinkedQueue<>(); + + /** All created contexts. */ + private final Queue<Class<? extends HadoopTaskContext>> fullCtxClsQueue = new ConcurrentLinkedDeque<>(); + + /** File system cache map. */ + private final HadoopLazyConcurrentMap<FsCacheKey, FileSystem> fsMap = createHadoopLazyConcurrentMap(); + + /** Local node ID */ + private volatile UUID locNodeId; + + /** Serialized JobConf. */ + private volatile byte[] jobConfData; + + /** + * Constructor. + * + * @param jobId Job ID. + * @param jobInfo Job info. + * @param log Logger. + * @param libNames Optional additional native library names. + */ + public HadoopV2Job(HadoopJobId jobId, final HadoopDefaultJobInfo jobInfo, IgniteLogger log, + @Nullable String[] libNames) { + assert jobId != null; + assert jobInfo != null; + + this.jobId = jobId; + this.jobInfo = jobInfo; + this.libNames = libNames; + + ClassLoader oldLdr = HadoopUtils.setContextClassLoader(getClass().getClassLoader()); + + try { + hadoopJobID = new JobID(jobId.globalId().toString(), jobId.localId()); + + jobConf = new JobConf(); + + HadoopFileSystemsUtils.setupFileSystems(jobConf); + + for (Map.Entry<String,String> e : jobInfo.properties().entrySet()) + jobConf.set(e.getKey(), e.getValue()); + + jobCtx = new JobContextImpl(jobConf, hadoopJobID); + + rsrcMgr = new HadoopV2JobResourceManager(jobId, jobCtx, log, this); + } + finally { + HadoopUtils.setContextClassLoader(oldLdr); + } + } + + /** {@inheritDoc} */ + @Override public HadoopJobId id() { + return jobId; + } + + /** {@inheritDoc} */ + @Override public HadoopJobInfo info() { + return jobInfo; + } + + /** {@inheritDoc} */ + @Override public Collection<HadoopInputSplit> input() throws IgniteCheckedException { + ClassLoader oldLdr = HadoopUtils.setContextClassLoader(jobConf.getClassLoader()); + + try { + String jobDirPath = jobConf.get(MRJobConfig.MAPREDUCE_JOB_DIR); + + if (jobDirPath == null) { // Probably job was submitted not by hadoop client. + // Assume that we have needed classes and try to generate input splits ourself. + if (jobConf.getUseNewMapper()) + return HadoopV2Splitter.splitJob(jobCtx); + else + return HadoopV1Splitter.splitJob(jobConf); + } + + Path jobDir = new Path(jobDirPath); + + try { + FileSystem fs = fileSystem(jobDir.toUri(), jobConf); + + JobSplit.TaskSplitMetaInfo[] metaInfos = SplitMetaInfoReader.readSplitMetaInfo(hadoopJobID, fs, jobConf, + jobDir); + + if (F.isEmpty(metaInfos)) + throw new IgniteCheckedException("No input splits found."); + + Path splitsFile = JobSubmissionFiles.getJobSplitFile(jobDir); + + try (FSDataInputStream in = fs.open(splitsFile)) { + Collection<HadoopInputSplit> res = new ArrayList<>(metaInfos.length); + + for (JobSplit.TaskSplitMetaInfo metaInfo : metaInfos) { + long off = metaInfo.getStartOffset(); + + String[] hosts = metaInfo.getLocations(); + + in.seek(off); + + String clsName = Text.readString(in); + + HadoopFileBlock block = HadoopV1Splitter.readFileBlock(clsName, in, hosts); + + if (block == null) + block = HadoopV2Splitter.readFileBlock(clsName, in, hosts); + + res.add(block != null ? block : new HadoopExternalSplit(hosts, off)); + } + + return res; + } + } + catch (Throwable e) { + if (e instanceof Error) + throw (Error)e; + else + throw transformException(e); + } + } + finally { + HadoopUtils.restoreContextClassLoader(oldLdr); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings({"unchecked", "MismatchedQueryAndUpdateOfCollection" }) + @Override public HadoopTaskContext getTaskContext(HadoopTaskInfo info) throws IgniteCheckedException { + T2<HadoopTaskType, Integer> locTaskId = new T2<>(info.type(), info.taskNumber()); + + GridFutureAdapter<HadoopTaskContext> fut = ctxs.get(locTaskId); + + if (fut != null) + return fut.get(); + + GridFutureAdapter<HadoopTaskContext> old = ctxs.putIfAbsent(locTaskId, fut = new GridFutureAdapter<>()); + + if (old != null) + return old.get(); + + Class<? extends HadoopTaskContext> cls = taskCtxClsPool.poll(); + + try { + if (cls == null) { + // If there is no pooled class, then load new one. + // Note that the classloader identified by the task it was initially created for, + // but later it may be reused for other tasks. + HadoopClassLoader ldr = new HadoopClassLoader(rsrcMgr.classPath(), + HadoopClassLoader.nameForTask(info, false), libNames); + + cls = (Class<? extends HadoopTaskContext>)ldr.loadClass(HadoopV2TaskContext.class.getName()); + + fullCtxClsQueue.add(cls); + } + + Constructor<?> ctr = cls.getConstructor(HadoopTaskInfo.class, HadoopJob.class, + HadoopJobId.class, UUID.class, DataInput.class); + + if (jobConfData == null) + synchronized(jobConf) { + if (jobConfData == null) { + ByteArrayOutputStream buf = new ByteArrayOutputStream(); + + jobConf.write(new DataOutputStream(buf)); + + jobConfData = buf.toByteArray(); + } + } + + HadoopTaskContext res = (HadoopTaskContext)ctr.newInstance(info, this, jobId, locNodeId, + new DataInputStream(new ByteArrayInputStream(jobConfData))); + + fut.onDone(res); + + return res; + } + catch (Throwable e) { + IgniteCheckedException te = transformException(e); + + fut.onDone(te); + + if (e instanceof Error) + throw (Error)e; + + throw te; + } + } + + /** {@inheritDoc} */ + @Override public void initialize(boolean external, UUID locNodeId) throws IgniteCheckedException { + assert locNodeId != null; + + this.locNodeId = locNodeId; + + ClassLoader oldLdr = HadoopUtils.setContextClassLoader(getClass().getClassLoader()); + + try { + rsrcMgr.prepareJobEnvironment(!external, jobLocalDir(locNodeId, jobId)); + } + finally { + HadoopUtils.restoreContextClassLoader(oldLdr); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings("ThrowFromFinallyBlock") + @Override public void dispose(boolean external) throws IgniteCheckedException { + try { + if (rsrcMgr != null && !external) { + File jobLocDir = jobLocalDir(locNodeId, jobId); + + if (jobLocDir.exists()) + U.delete(jobLocDir); + } + } + finally { + taskCtxClsPool.clear(); + + Throwable err = null; + + // Stop the daemon threads that have been created + // with the task class loaders: + while (true) { + Class<? extends HadoopTaskContext> cls = fullCtxClsQueue.poll(); + + if (cls == null) + break; + + try { + final ClassLoader ldr = cls.getClassLoader(); + + try { + // Stop Hadoop daemons for this *task*: + stopHadoopFsDaemons(ldr); + } + catch (Exception e) { + if (err == null) + err = e; + } + + // Also close all the FileSystems cached in + // HadoopLazyConcurrentMap for this *task* class loader: + closeCachedTaskFileSystems(ldr); + } + catch (Throwable e) { + if (err == null) + err = e; + + if (e instanceof Error) + throw (Error)e; + } + } + + assert fullCtxClsQueue.isEmpty(); + + try { + // Close all cached file systems for this *Job*: + fsMap.close(); + } + catch (Exception e) { + if (err == null) + err = e; + } + + if (err != null) + throw U.cast(err); + } + } + + /** + * Stops Hadoop Fs daemon threads. + * @param ldr The task ClassLoader to stop the daemons for. + * @throws Exception On error. + */ + private void stopHadoopFsDaemons(ClassLoader ldr) throws Exception { + Class<?> daemonCls = ldr.loadClass(HadoopClassLoader.CLS_DAEMON); + + Method m = daemonCls.getMethod("dequeueAndStopAll"); + + m.invoke(null); + } + + /** + * Closes all the file systems user by task + * @param ldr The task class loader. + * @throws Exception On error. + */ + private void closeCachedTaskFileSystems(ClassLoader ldr) throws Exception { + Class<?> clazz = ldr.loadClass(HadoopV2TaskContext.class.getName()); + + Method m = clazz.getMethod("close"); + + m.invoke(null); + } + + /** {@inheritDoc} */ + @Override public void prepareTaskEnvironment(HadoopTaskInfo info) throws IgniteCheckedException { + rsrcMgr.prepareTaskWorkDir(taskLocalDir(locNodeId, info)); + } + + /** {@inheritDoc} */ + @Override public void cleanupTaskEnvironment(HadoopTaskInfo info) throws IgniteCheckedException { + HadoopTaskContext ctx = ctxs.remove(new T2<>(info.type(), info.taskNumber())).get(); + + taskCtxClsPool.add(ctx.getClass()); + + File locDir = taskLocalDir(locNodeId, info); + + if (locDir.exists()) + U.delete(locDir); + } + + /** {@inheritDoc} */ + @Override public void cleanupStagingDirectory() { + rsrcMgr.cleanupStagingDirectory(); + } + + /** + * Getter for job configuration. + * @return The job configuration. + */ + public JobConf jobConf() { + return jobConf; + } + + /** + * Gets file system for this job. + * @param uri The uri. + * @param cfg The configuration. + * @return The file system. + * @throws IOException On error. + */ + public FileSystem fileSystem(@Nullable URI uri, Configuration cfg) throws IOException { + return fileSystemForMrUserWithCaching(uri, cfg, fsMap); + } +} \ No newline at end of file