[FLINK-1396][FLINK-1303] Hadoop Input/Output directly in API This adds methods on ExecutionEnvironment for reading with Hadoop Input/OutputFormat.
This also adds support in the Scala API for Hadoop Input/OutputFormats. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cd2f88af Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cd2f88af Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cd2f88af Branch: refs/heads/release-0.8 Commit: cd2f88afdad4c9eb2cd141eb3283d2e0084b2527 Parents: 944e2e3 Author: Aljoscha Krettek <[email protected]> Authored: Wed Jan 28 15:13:30 2015 +0100 Committer: Aljoscha Krettek <[email protected]> Committed: Mon Feb 9 15:38:06 2015 +0100 ---------------------------------------------------------------------- docs/hadoop_compatibility.md | 112 ++++-- .../mapred/HadoopInputFormat.java | 298 ---------------- .../mapred/HadoopMapFunction.java | 2 +- .../mapred/HadoopOutputFormat.java | 184 ---------- .../mapred/HadoopReduceCombineFunction.java | 2 +- .../mapred/HadoopReduceFunction.java | 2 +- .../example/HadoopMapredCompatWordCount.java | 4 +- .../mapred/record/HadoopRecordInputFormat.java | 8 +- .../mapred/record/HadoopRecordOutputFormat.java | 6 +- .../mapred/utils/HadoopUtils.java | 87 ----- .../mapred/wrapper/HadoopDummyProgressable.java | 33 -- .../mapred/wrapper/HadoopDummyReporter.java | 70 ---- .../mapred/wrapper/HadoopInputSplit.java | 103 ------ .../mapreduce/HadoopInputFormat.java | 339 ------------------- .../mapreduce/HadoopOutputFormat.java | 227 ------------- .../mapreduce/example/WordCount.java | 4 +- .../mapreduce/utils/HadoopUtils.java | 83 ----- .../mapreduce/wrapper/HadoopInputSplit.java | 90 ----- .../mapred/HadoopIOFormatsITCase.java | 230 ------------- flink-java/pom.xml | 22 ++ .../java/org/apache/flink/api/java/DataSet.java | 3 +- .../flink/api/java/ExecutionEnvironment.java | 64 ++++ .../java/hadoop/mapred/HadoopInputFormat.java | 55 +++ .../hadoop/mapred/HadoopInputFormatBase.java | 253 ++++++++++++++ .../java/hadoop/mapred/HadoopOutputFormat.java | 37 ++ .../hadoop/mapred/HadoopOutputFormatBase.java | 165 +++++++++ .../java/hadoop/mapred/utils/HadoopUtils.java | 154 +++++++++ .../mapred/wrapper/HadoopDummyProgressable.java | 33 ++ .../mapred/wrapper/HadoopDummyReporter.java | 70 ++++ .../hadoop/mapred/wrapper/HadoopInputSplit.java | 138 ++++++++ .../hadoop/mapreduce/HadoopInputFormat.java | 60 ++++ .../hadoop/mapreduce/HadoopInputFormatBase.java | 289 ++++++++++++++++ .../hadoop/mapreduce/HadoopOutputFormat.java | 41 +++ .../mapreduce/HadoopOutputFormatBase.java | 203 +++++++++++ .../hadoop/mapreduce/utils/HadoopUtils.java | 82 +++++ .../mapreduce/wrapper/HadoopInputSplit.java | 125 +++++++ .../hadoop/mapred/HadoopInputFormatTest.java | 82 +++++ .../hadoop/mapreduce/HadoopInputFormatTest.java | 84 +++++ .../flink/api/scala/ExecutionEnvironment.scala | 97 +++++- .../scala/hadoop/mapred/HadoopInputFormat.scala | 41 +++ .../hadoop/mapred/HadoopOutputFormat.scala | 29 ++ .../hadoop/mapreduce/HadoopInputFormat.scala | 42 +++ .../hadoop/mapreduce/HadoopOutputFormat.scala | 30 ++ flink-tests/pom.xml | 12 +- .../hadoop/mapred/WordCountMapredITCase.java | 118 +++++++ .../mapreduce/WordCountMapreduceITCase.java | 118 +++++++ .../hadoop/mapred/WordCountMapredITCase.scala | 67 ++++ .../mapreduce/WordCountMapreduceITCase.scala | 70 ++++ 48 files changed, 2668 insertions(+), 1800 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/docs/hadoop_compatibility.md ---------------------------------------------------------------------- diff --git a/docs/hadoop_compatibility.md b/docs/hadoop_compatibility.md index 9b43022..cacca0f 100644 --- a/docs/hadoop_compatibility.md +++ b/docs/hadoop_compatibility.md @@ -23,7 +23,8 @@ under the License. * This will be replaced by the TOC {:toc} -Flink is compatible with many Apache Hadoop's MapReduce interfaces and allows to reuse a lot of code that was implemented for Hadoop MapReduce. +Flink is compatible with Apache Hadoop MapReduce interfaces and therefore allows +reusing code that was implemented for Hadoop MapReduce. You can: @@ -38,9 +39,19 @@ This document shows how to use existing Hadoop MapReduce code with Flink. Please ### Project Configuration -The Hadoop Compatibility Layer is part of the `flink-addons` Maven module. All relevant classes are located in the `org.apache.flink.hadoopcompatibility` package. It includes separate packages and classes for the Hadoop `mapred` and `mapreduce` APIs. +Support for Haddop input/output formats is part of the `flink-java` and +`flink-scala` Maven modules that are always required when writing Flink jobs. +The code is located in `org.apache.flink.api.java.hadoop` and +`org.apache.flink.api.scala.hadoop` in an additional sub-package for the +`mapred` and `mapreduce` API. -Add the following dependency to your `pom.xml` to use the Hadoop Compatibility Layer. +Support for Hadoop Mappers and Reducers is contained in the `flink-staging` +Maven module. +This code resides in the `org.apache.flink.hadoopcompatibility` +package. + +Add the following dependency to your `pom.xml` if you want to reuse Mappers +and Reducers. ~~~xml <dependency> @@ -52,56 +63,70 @@ Add the following dependency to your `pom.xml` to use the Hadoop Compatibility L ### Using Hadoop Data Types -Flink supports all Hadoop `Writable` and `WritableComparable` data types out-of-the-box. You do not need to include the Hadoop Compatibility dependency, if you only want to use your Hadoop data types. See the [Programming Guide](programming_guide.html#data-types) for more details. +Flink supports all Hadoop `Writable` and `WritableComparable` data types +out-of-the-box. You do not need to include the Hadoop Compatibility dependency, +if you only want to use your Hadoop data types. See the +[Programming Guide](programming_guide.html#data-types) for more details. ### Using Hadoop InputFormats -Flink provides a compatibility wrapper for Hadoop `InputFormats`. Any class that implements `org.apache.hadoop.mapred.InputFormat` or extends `org.apache.hadoop.mapreduce.InputFormat` is supported. Thus, Flink can handle Hadoop built-in formats such as `TextInputFormat` as well as external formats such as Hive's `HCatInputFormat`. Data read from Hadoop InputFormats is converted into a `DataSet<Tuple2<KEY,VALUE>>` where `KEY` is the key and `VALUE` is the value of the original Hadoop key-value pair. - -Flink's InputFormat wrappers are - -- `org.apache.flink.hadoopcompatibility.mapred.HadoopInputFormat` and -- `org.apache.flink.hadoopcompatibility.mapreduce.HadoopInputFormat` +Hadoop input formats can be used to create a data source by using +one of the methods `readHadoopFile` or `createHadoopInput` of the +`ExecutionEnvironment`. The former is used for input formats derived +from `FileInputFormat` while the latter has to be used for general purpose +input formats. -and can be used as regular Flink [InputFormats](programming_guide.html#data-sources). +The resulting `DataSet` contains 2-tuples where the first field +is the key and the second field is the value retrieved from the Hadoop +InputFormat. The following example shows how to use Hadoop's `TextInputFormat`. +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> + ~~~java ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - -// Set up the Hadoop TextInputFormat. -Job job = Job.getInstance(); -HadoopInputFormat<LongWritable, Text> hadoopIF = - // create the Flink wrapper. - new HadoopInputFormat<LongWritable, Text>( - // create the Hadoop InputFormat, specify key and value type, and job. - new TextInputFormat(), LongWritable.class, Text.class, job - ); -TextInputFormat.addInputPath(job, new Path(inputPath)); - -// Read data using the Hadoop TextInputFormat. -DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopIF); + +DataSet<Tuple2<LongWritable, Text>> input = + env.readHadoopFile(new TextInputFormat(), LongWritable.class, Text.class, textPath); // Do something with the data. [...] ~~~ -### Using Hadoop OutputFormats +</div> +<div data-lang="scala" markdown="1"> -Flink provides a compatibility wrapper for Hadoop `OutputFormats`. Any class that implements `org.apache.hadoop.mapred.OutputFormat` or extends `org.apache.hadoop.mapreduce.OutputFormat` is supported. The OutputFormat wrapper expects its input data to be a `DataSet<Tuple2<KEY,VALUE>>` where `KEY` is the key and `VALUE` is the value of the Hadoop key-value pair that is processed by the Hadoop OutputFormat. +~~~scala +val env = ExecutionEnvironment.getExecutionEnvironment + +val input: DataSet[(LongWritable, Text)] = + env.readHadoopFile(new TextInputFormat, classOf[LongWritable], classOf[Text], textPath) -Flink's OUtputFormat wrappers are +// Do something with the data. +[...] +~~~ + +</div> -- `org.apache.flink.hadoopcompatibility.mapred.HadoopOutputFormat` and -- `org.apache.flink.hadoopcompatibility.mapreduce.HadoopOutputFormat` +</div> + +### Using Hadoop OutputFormats -and can be used as regular Flink [OutputFormats](programming_guide.html#data-sinks). +Flink provides a compatibility wrapper for Hadoop `OutputFormats`. Any class +that implements `org.apache.hadoop.mapred.OutputFormat` or extends +`org.apache.hadoop.mapreduce.OutputFormat` is supported. +The OutputFormat wrapper expects its input data to be a DataSet containing +2-tuples of key and value. These are to be processed by the Hadoop OutputFormat. The following example shows how to use Hadoop's `TextOutputFormat`. +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> + ~~~java -// Obtain your result to emit. +// Obtain the result we want to emit DataSet<Tuple2<Text, IntWritable>> hadoopResult = [...] // Set up the Hadoop TextOutputFormat. @@ -115,9 +140,32 @@ hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", " TextOutputFormat.setOutputPath(job, new Path(outputPath)); // Emit data using the Hadoop TextOutputFormat. -result.output(hadoopOF); +hadoopResult.output(hadoopOF); ~~~ +</div> +<div data-lang="scala" markdown="1"> + +~~~scala +// Obtain your result to emit. +val hadoopResult: DataSet[(Text, IntWritable)] = [...] + +val hadoopOF = new HadoopOutputFormat[Text,IntWritable]( + new TextOutputFormat[Text, IntWritable], + new JobConf) + +hadoopOF.getJobConf.set("mapred.textoutputformat.separator", " ") +FileOutputFormat.setOutputPath(hadoopOF.getJobConf, new Path(resultPath)) + +hadoopResult.output(hadoopOF) + + +~~~ + +</div> + +</div> + ### Using Hadoop Mappers and Reducers Hadoop Mappers are semantically equivalent to Flink's [FlatMapFunctions](dataset_transformations.html#flatmap) and Hadoop Reducers are equivalent to Flink's [GroupReduceFunctions](dataset_transformations.html#groupreduce-on-grouped-dataset). Flink provides wrappers for implementations of Hadoop MapReduce's `Mapper` and `Reducer` interfaces, i.e., you can reuse your Hadoop Mappers and Reducers in regular Flink programs. At the moment, only the Mapper and Reduce interfaces of Hadoop's mapred API (`org.apache.hadoop.mapred`) are supported. http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java deleted file mode 100644 index 8dfda67..0000000 --- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java +++ /dev/null @@ -1,298 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.hadoopcompatibility.mapred; - -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.util.ArrayList; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.flink.api.common.io.DefaultInputSplitAssigner; -import org.apache.flink.api.common.io.InputFormat; -import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics; -import org.apache.flink.api.common.io.statistics.BaseStatistics; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.ResultTypeQueryable; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.api.java.typeutils.WritableTypeInfo; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.fs.FileStatus; -import org.apache.flink.core.fs.FileSystem; -import org.apache.flink.core.fs.Path; -import org.apache.flink.core.io.InputSplitAssigner; -import org.apache.flink.hadoopcompatibility.mapred.utils.HadoopUtils; -import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter; -import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopInputSplit; -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.util.ReflectionUtils; - -public class HadoopInputFormat<K extends Writable, V extends Writable> implements InputFormat<Tuple2<K,V>, HadoopInputSplit>, ResultTypeQueryable<Tuple2<K,V>> { - - private static final long serialVersionUID = 1L; - - private static final Logger LOG = LoggerFactory.getLogger(HadoopInputFormat.class); - - private org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat; - private Class<K> keyClass; - private Class<V> valueClass; - private JobConf jobConf; - - private transient K key; - private transient V value; - - private transient RecordReader<K, V> recordReader; - private transient boolean fetched = false; - private transient boolean hasNext; - - public HadoopInputFormat() { - super(); - } - - public HadoopInputFormat(org.apache.hadoop.mapred.InputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, JobConf job) { - super(); - this.mapredInputFormat = mapredInputFormat; - this.keyClass = key; - this.valueClass = value; - HadoopUtils.mergeHadoopConf(job); - this.jobConf = job; - ReflectionUtils.setConf(mapredInputFormat, jobConf); - } - - public void setJobConf(JobConf job) { - this.jobConf = job; - } - - public org.apache.hadoop.mapred.InputFormat<K,V> getHadoopInputFormat() { - return mapredInputFormat; - } - - public void setHadoopInputFormat(org.apache.hadoop.mapred.InputFormat<K,V> mapredInputFormat) { - this.mapredInputFormat = mapredInputFormat; - } - - public JobConf getJobConf() { - return jobConf; - } - - // -------------------------------------------------------------------------------------------- - // InputFormat - // -------------------------------------------------------------------------------------------- - - @Override - public void configure(Configuration parameters) { - // nothing to do - } - - @Override - public BaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException { - // only gather base statistics for FileInputFormats - if(!(mapredInputFormat instanceof FileInputFormat)) { - return null; - } - - final FileBaseStatistics cachedFileStats = (cachedStats != null && cachedStats instanceof FileBaseStatistics) ? - (FileBaseStatistics) cachedStats : null; - - try { - final org.apache.hadoop.fs.Path[] paths = FileInputFormat.getInputPaths(this.jobConf); - - return getFileStats(cachedFileStats, paths, new ArrayList<FileStatus>(1)); - } catch (IOException ioex) { - if (LOG.isWarnEnabled()) { - LOG.warn("Could not determine statistics due to an io error: " - + ioex.getMessage()); - } - } catch (Throwable t) { - if (LOG.isErrorEnabled()) { - LOG.error("Unexpected problem while getting the file statistics: " - + t.getMessage(), t); - } - } - - // no statistics available - return null; - } - - @Override - public HadoopInputSplit[] createInputSplits(int minNumSplits) - throws IOException { - org.apache.hadoop.mapred.InputSplit[] splitArray = mapredInputFormat.getSplits(jobConf, minNumSplits); - HadoopInputSplit[] hiSplit = new HadoopInputSplit[splitArray.length]; - for(int i=0;i<splitArray.length;i++){ - hiSplit[i] = new HadoopInputSplit(splitArray[i], jobConf); - } - return hiSplit; - } - - @Override - public InputSplitAssigner getInputSplitAssigner(HadoopInputSplit[] inputSplits) { - return new DefaultInputSplitAssigner(inputSplits); - } - - @Override - public void open(HadoopInputSplit split) throws IOException { - this.recordReader = this.mapredInputFormat.getRecordReader(split.getHadoopInputSplit(), jobConf, new HadoopDummyReporter()); - if (this.recordReader instanceof Configurable) { - ((Configurable) this.recordReader).setConf(jobConf); - } - key = this.recordReader.createKey(); - value = this.recordReader.createValue(); - this.fetched = false; - } - - @Override - public boolean reachedEnd() throws IOException { - if(!fetched) { - fetchNext(); - } - return !hasNext; - } - - private void fetchNext() throws IOException { - hasNext = this.recordReader.next(key, value); - fetched = true; - } - - @Override - public Tuple2<K, V> nextRecord(Tuple2<K, V> record) throws IOException { - if(!fetched) { - fetchNext(); - } - if(!hasNext) { - return null; - } - record.f0 = key; - record.f1 = value; - fetched = false; - return record; - } - - @Override - public void close() throws IOException { - this.recordReader.close(); - } - - // -------------------------------------------------------------------------------------------- - // Helper methods - // -------------------------------------------------------------------------------------------- - - private FileBaseStatistics getFileStats(FileBaseStatistics cachedStats, org.apache.hadoop.fs.Path[] hadoopFilePaths, - ArrayList<FileStatus> files) throws IOException { - - long latestModTime = 0L; - - // get the file info and check whether the cached statistics are still valid. - for(org.apache.hadoop.fs.Path hadoopPath : hadoopFilePaths) { - - final Path filePath = new Path(hadoopPath.toUri()); - final FileSystem fs = FileSystem.get(filePath.toUri()); - - final FileStatus file = fs.getFileStatus(filePath); - latestModTime = Math.max(latestModTime, file.getModificationTime()); - - // enumerate all files and check their modification time stamp. - if (file.isDir()) { - FileStatus[] fss = fs.listStatus(filePath); - files.ensureCapacity(files.size() + fss.length); - - for (FileStatus s : fss) { - if (!s.isDir()) { - files.add(s); - latestModTime = Math.max(s.getModificationTime(), latestModTime); - } - } - } else { - files.add(file); - } - } - - // check whether the cached statistics are still valid, if we have any - if (cachedStats != null && latestModTime <= cachedStats.getLastModificationTime()) { - return cachedStats; - } - - // calculate the whole length - long len = 0; - for (FileStatus s : files) { - len += s.getLen(); - } - - // sanity check - if (len <= 0) { - len = BaseStatistics.SIZE_UNKNOWN; - } - - return new FileBaseStatistics(latestModTime, len, BaseStatistics.AVG_RECORD_BYTES_UNKNOWN); - } - - // -------------------------------------------------------------------------------------------- - // Custom serialization methods - // -------------------------------------------------------------------------------------------- - - private void writeObject(ObjectOutputStream out) throws IOException { - out.writeUTF(mapredInputFormat.getClass().getName()); - out.writeUTF(keyClass.getName()); - out.writeUTF(valueClass.getName()); - jobConf.write(out); - } - - @SuppressWarnings("unchecked") - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - String hadoopInputFormatClassName = in.readUTF(); - String keyClassName = in.readUTF(); - String valueClassName = in.readUTF(); - if(jobConf == null) { - jobConf = new JobConf(); - } - jobConf.readFields(in); - try { - this.mapredInputFormat = (org.apache.hadoop.mapred.InputFormat<K,V>) Class.forName(hadoopInputFormatClassName, true, Thread.currentThread().getContextClassLoader()).newInstance(); - } catch (Exception e) { - throw new RuntimeException("Unable to instantiate the hadoop input format", e); - } - try { - this.keyClass = (Class<K>) Class.forName(keyClassName, true, Thread.currentThread().getContextClassLoader()); - } catch (Exception e) { - throw new RuntimeException("Unable to find key class.", e); - } - try { - this.valueClass = (Class<V>) Class.forName(valueClassName, true, Thread.currentThread().getContextClassLoader()); - } catch (Exception e) { - throw new RuntimeException("Unable to find value class.", e); - } - ReflectionUtils.setConf(mapredInputFormat, jobConf); - } - - // -------------------------------------------------------------------------------------------- - // ResultTypeQueryable - // -------------------------------------------------------------------------------------------- - - @Override - public TypeInformation<Tuple2<K,V>> getProducedType() { - return new TupleTypeInfo<Tuple2<K,V>>(new WritableTypeInfo<K>((Class<K>) keyClass), new WritableTypeInfo<V>((Class<V>) valueClass)); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java index 9bc36f3..ad94c01 100644 --- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java +++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java @@ -31,7 +31,7 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.java.typeutils.WritableTypeInfo; import org.apache.flink.configuration.Configuration; -import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter; +import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter; import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector; import org.apache.flink.util.Collector; import org.apache.flink.util.InstantiationUtil; http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java deleted file mode 100644 index 64c539b..0000000 --- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java +++ /dev/null @@ -1,184 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.hadoopcompatibility.mapred; - -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; - -import org.apache.flink.api.common.io.FinalizeOnMaster; -import org.apache.flink.api.common.io.OutputFormat; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.hadoopcompatibility.mapred.utils.HadoopUtils; -import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyProgressable; -import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapred.FileOutputCommitter; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.JobContext; -import org.apache.hadoop.mapred.JobID; -import org.apache.hadoop.mapred.RecordWriter; -import org.apache.hadoop.mapred.TaskAttemptContext; -import org.apache.hadoop.mapred.TaskAttemptID; -import org.apache.hadoop.util.ReflectionUtils; - - -public class HadoopOutputFormat<K extends Writable,V extends Writable> implements OutputFormat<Tuple2<K, V>>, FinalizeOnMaster { - - private static final long serialVersionUID = 1L; - - private JobConf jobConf; - private org.apache.hadoop.mapred.OutputFormat<K,V> mapredOutputFormat; - private transient RecordWriter<K,V> recordWriter; - private transient FileOutputCommitter fileOutputCommitter; - private transient TaskAttemptContext context; - private transient JobContext jobContext; - - public HadoopOutputFormat(org.apache.hadoop.mapred.OutputFormat<K,V> mapredOutputFormat, JobConf job) { - super(); - this.mapredOutputFormat = mapredOutputFormat; - HadoopUtils.mergeHadoopConf(job); - this.jobConf = job; - } - - public void setJobConf(JobConf job) { - this.jobConf = job; - } - - public JobConf getJobConf() { - return jobConf; - } - - public org.apache.hadoop.mapred.OutputFormat<K,V> getHadoopOutputFormat() { - return mapredOutputFormat; - } - - public void setHadoopOutputFormat(org.apache.hadoop.mapred.OutputFormat<K,V> mapredOutputFormat) { - this.mapredOutputFormat = mapredOutputFormat; - } - - // -------------------------------------------------------------------------------------------- - // OutputFormat - // -------------------------------------------------------------------------------------------- - - @Override - public void configure(Configuration parameters) { - // nothing to do - } - - /** - * create the temporary output file for hadoop RecordWriter. - * @param taskNumber The number of the parallel instance. - * @param numTasks The number of parallel tasks. - * @throws IOException - */ - @Override - public void open(int taskNumber, int numTasks) throws IOException { - if (Integer.toString(taskNumber + 1).length() > 6) { - throw new IOException("Task id too large."); - } - - TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_" - + String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s"," ").replace(" ", "0") - + Integer.toString(taskNumber + 1) - + "_0"); - - this.jobConf.set("mapred.task.id", taskAttemptID.toString()); - this.jobConf.setInt("mapred.task.partition", taskNumber + 1); - // for hadoop 2.2 - this.jobConf.set("mapreduce.task.attempt.id", taskAttemptID.toString()); - this.jobConf.setInt("mapreduce.task.partition", taskNumber + 1); - - try { - this.context = HadoopUtils.instantiateTaskAttemptContext(this.jobConf, taskAttemptID); - } catch (Exception e) { - throw new RuntimeException(e); - } - - this.fileOutputCommitter = new FileOutputCommitter(); - - try { - this.jobContext = HadoopUtils.instantiateJobContext(this.jobConf, new JobID()); - } catch (Exception e) { - throw new RuntimeException(e); - } - - this.fileOutputCommitter.setupJob(jobContext); - - this.recordWriter = this.mapredOutputFormat.getRecordWriter(null, this.jobConf, Integer.toString(taskNumber + 1), new HadoopDummyProgressable()); - } - - @Override - public void writeRecord(Tuple2<K, V> record) throws IOException { - this.recordWriter.write(record.f0, record.f1); - } - - /** - * commit the task by moving the output file out from the temporary directory. - * @throws IOException - */ - @Override - public void close() throws IOException { - this.recordWriter.close(new HadoopDummyReporter()); - - if (this.fileOutputCommitter.needsTaskCommit(this.context)) { - this.fileOutputCommitter.commitTask(this.context); - } - } - - @Override - public void finalizeGlobal(int parallelism) throws IOException { - - try { - JobContext jobContext = HadoopUtils.instantiateJobContext(this.jobConf, new JobID()); - FileOutputCommitter fileOutputCommitter = new FileOutputCommitter(); - - // finalize HDFS output format - fileOutputCommitter.commitJob(jobContext); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - // -------------------------------------------------------------------------------------------- - // Custom serialization methods - // -------------------------------------------------------------------------------------------- - - private void writeObject(ObjectOutputStream out) throws IOException { - out.writeUTF(mapredOutputFormat.getClass().getName()); - jobConf.write(out); - } - - @SuppressWarnings("unchecked") - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - String hadoopOutputFormatName = in.readUTF(); - if(jobConf == null) { - jobConf = new JobConf(); - } - jobConf.readFields(in); - try { - this.mapredOutputFormat = (org.apache.hadoop.mapred.OutputFormat<K,V>) Class.forName(hadoopOutputFormatName, true, Thread.currentThread().getContextClassLoader()).newInstance(); - } catch (Exception e) { - throw new RuntimeException("Unable to instantiate the hadoop output format", e); - } - ReflectionUtils.setConf(mapredOutputFormat, jobConf); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java index 5d83bad..3687bb2 100644 --- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java +++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java @@ -31,7 +31,7 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.java.typeutils.WritableTypeInfo; import org.apache.flink.configuration.Configuration; -import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter; +import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter; import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector; import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator; import org.apache.flink.util.Collector; http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java index 1f0aedd..439c31c 100644 --- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java +++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java @@ -31,7 +31,7 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.java.typeutils.WritableTypeInfo; import org.apache.flink.configuration.Configuration; -import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter; +import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter; import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector; import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator; import org.apache.flink.util.Collector; http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java index de20fab..3547e47 100644 --- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java +++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java @@ -24,9 +24,9 @@ import java.util.Iterator; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.hadoopcompatibility.mapred.HadoopInputFormat; +import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat; import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction; -import org.apache.flink.hadoopcompatibility.mapred.HadoopOutputFormat; +import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat; import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordInputFormat.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordInputFormat.java index f8153a2..edcc43b 100644 --- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordInputFormat.java +++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordInputFormat.java @@ -29,9 +29,9 @@ import org.apache.flink.api.common.io.statistics.BaseStatistics; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.InputSplitAssigner; import org.apache.flink.hadoopcompatibility.mapred.record.datatypes.HadoopTypeConverter; -import org.apache.flink.hadoopcompatibility.mapred.utils.HadoopUtils; -import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter; -import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopInputSplit; +import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils; +import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter; +import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopInputSplit; import org.apache.flink.types.Record; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; @@ -80,7 +80,7 @@ public class HadoopRecordInputFormat<K, V> implements InputFormat<Record, Hadoop org.apache.hadoop.mapred.InputSplit[] splitArray = hadoopInputFormat.getSplits(jobConf, minNumSplits); HadoopInputSplit[] hiSplit = new HadoopInputSplit[splitArray.length]; for(int i=0;i<splitArray.length;i++){ - hiSplit[i] = new HadoopInputSplit(splitArray[i], jobConf); + hiSplit[i] = new HadoopInputSplit(i, splitArray[i], jobConf); } return hiSplit; } http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordOutputFormat.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordOutputFormat.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordOutputFormat.java index 74118a3..e519062 100644 --- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordOutputFormat.java +++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordOutputFormat.java @@ -27,9 +27,9 @@ import org.apache.flink.api.common.io.OutputFormat; import org.apache.flink.configuration.Configuration; import org.apache.flink.hadoopcompatibility.mapred.record.datatypes.HadoopFileOutputCommitter; import org.apache.flink.hadoopcompatibility.mapred.record.datatypes.FlinkTypeConverter; -import org.apache.flink.hadoopcompatibility.mapred.utils.HadoopUtils; -import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyProgressable; -import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter; +import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils; +import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyProgressable; +import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter; import org.apache.flink.types.Record; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordWriter; http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/utils/HadoopUtils.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/utils/HadoopUtils.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/utils/HadoopUtils.java deleted file mode 100644 index 2d2f518..0000000 --- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/utils/HadoopUtils.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.hadoopcompatibility.mapred.utils; - -import java.lang.reflect.Constructor; -import java.util.Map; - -import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.JobContext; -import org.apache.hadoop.mapred.JobID; -import org.apache.hadoop.mapred.TaskAttemptContext; -import org.apache.hadoop.mapred.TaskAttemptID; - - -public class HadoopUtils { - - /** - * Merge HadoopConfiguration into JobConf. This is necessary for the HDFS configuration. - */ - public static void mergeHadoopConf(JobConf jobConf) { - org.apache.hadoop.conf.Configuration hadoopConf = HadoopFileSystem.getHadoopConfiguration(); - for (Map.Entry<String, String> e : hadoopConf) { - jobConf.set(e.getKey(), e.getValue()); - } - } - - public static JobContext instantiateJobContext(JobConf jobConf, JobID jobId) throws Exception { - try { - // for Hadoop 1.xx - Class<?> clazz = null; - if(!TaskAttemptContext.class.isInterface()) { - clazz = Class.forName("org.apache.hadoop.mapred.JobContext", true, Thread.currentThread().getContextClassLoader()); - } - // for Hadoop 2.xx - else { - clazz = Class.forName("org.apache.hadoop.mapred.JobContextImpl", true, Thread.currentThread().getContextClassLoader()); - } - Constructor<?> constructor = clazz.getDeclaredConstructor(JobConf.class, org.apache.hadoop.mapreduce.JobID.class); - // for Hadoop 1.xx - constructor.setAccessible(true); - JobContext context = (JobContext) constructor.newInstance(jobConf, jobId); - - return context; - } catch(Exception e) { - throw new Exception("Could not create instance of JobContext.", e); - } - } - - public static TaskAttemptContext instantiateTaskAttemptContext(JobConf jobConf, TaskAttemptID taskAttemptID) throws Exception { - try { - // for Hadoop 1.xx - Class<?> clazz = null; - if(!TaskAttemptContext.class.isInterface()) { - clazz = Class.forName("org.apache.hadoop.mapred.TaskAttemptContext", true, Thread.currentThread().getContextClassLoader()); - } - // for Hadoop 2.xx - else { - clazz = Class.forName("org.apache.hadoop.mapred.TaskAttemptContextImpl", true, Thread.currentThread().getContextClassLoader()); - } - Constructor<?> constructor = clazz.getDeclaredConstructor(JobConf.class, TaskAttemptID.class); - // for Hadoop 1.xx - constructor.setAccessible(true); - TaskAttemptContext context = (TaskAttemptContext) constructor.newInstance(jobConf, taskAttemptID); - return context; - } catch(Exception e) { - throw new Exception("Could not create instance of TaskAttemptContext.", e); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyProgressable.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyProgressable.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyProgressable.java deleted file mode 100644 index 483dd2f..0000000 --- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyProgressable.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.hadoopcompatibility.mapred.wrapper; - -import org.apache.hadoop.util.Progressable; - -/** - * This is a dummy progress - * - */ -public class HadoopDummyProgressable implements Progressable { - @Override - public void progress() { - - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyReporter.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyReporter.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyReporter.java deleted file mode 100644 index 84a1e9e..0000000 --- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyReporter.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.hadoopcompatibility.mapred.wrapper; - -import org.apache.hadoop.mapred.Counters.Counter; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.Reporter; - -/** - * This is a dummy progress monitor / reporter - * - */ -public class HadoopDummyReporter implements Reporter { - - @Override - public void progress() { - } - - @Override - public void setStatus(String status) { - - } - - @Override - public Counter getCounter(Enum<?> name) { - return null; - } - - @Override - public Counter getCounter(String group, String name) { - return null; - } - - @Override - public void incrCounter(Enum<?> key, long amount) { - - } - - @Override - public void incrCounter(String group, String counter, long amount) { - - } - - @Override - public InputSplit getInputSplit() throws UnsupportedOperationException { - return null; - } - // There should be an @Override, but some CDH4 dependency does not contain this method - public float getProgress() { - return 0; - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java deleted file mode 100644 index cf36a9d..0000000 --- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.hadoopcompatibility.mapred.wrapper; - -import java.io.IOException; - -import org.apache.flink.core.io.InputSplit; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.io.WritableFactories; -import org.apache.hadoop.mapred.JobConf; - - -public class HadoopInputSplit implements InputSplit { - - private static final long serialVersionUID = 1L; - - - private transient org.apache.hadoop.mapred.InputSplit hadoopInputSplit; - - private JobConf jobConf; - - private int splitNumber; - private String hadoopInputSplitTypeName; - - - public org.apache.hadoop.mapred.InputSplit getHadoopInputSplit() { - return hadoopInputSplit; - } - - public HadoopInputSplit() { - super(); - } - - public HadoopInputSplit(org.apache.hadoop.mapred.InputSplit hInputSplit, JobConf jobconf) { - this.hadoopInputSplit = hInputSplit; - this.hadoopInputSplitTypeName = hInputSplit.getClass().getName(); - this.jobConf = jobconf; - } - - @Override - public void write(DataOutputView out) throws IOException { - out.writeInt(splitNumber); - out.writeUTF(hadoopInputSplitTypeName); - jobConf.write(out); - hadoopInputSplit.write(out); - } - - @Override - public void read(DataInputView in) throws IOException { - this.splitNumber=in.readInt(); - this.hadoopInputSplitTypeName = in.readUTF(); - if(hadoopInputSplit == null) { - try { - Class<? extends org.apache.hadoop.io.Writable> inputSplit = - Class.forName(hadoopInputSplitTypeName).asSubclass(org.apache.hadoop.io.Writable.class); - this.hadoopInputSplit = (org.apache.hadoop.mapred.InputSplit) WritableFactories.newInstance( inputSplit ); - } - catch (Exception e) { - throw new RuntimeException("Unable to create InputSplit", e); - } - } - jobConf = new JobConf(); - jobConf.readFields(in); - if (this.hadoopInputSplit instanceof Configurable) { - ((Configurable) this.hadoopInputSplit).setConf(this.jobConf); - } - this.hadoopInputSplit.readFields(in); - - } - - @Override - public int getSplitNumber() { - return this.splitNumber; - } - - public void setSplitNumber(int splitNumber) { - this.splitNumber = splitNumber; - } - - public void setHadoopInputSplit( - org.apache.hadoop.mapred.InputSplit hadoopInputSplit) { - this.hadoopInputSplit = hadoopInputSplit; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java deleted file mode 100644 index 280aaf9..0000000 --- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java +++ /dev/null @@ -1,339 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.hadoopcompatibility.mapreduce; - -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.util.ArrayList; -import java.util.List; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.flink.api.common.io.DefaultInputSplitAssigner; -import org.apache.flink.api.common.io.InputFormat; -import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics; -import org.apache.flink.api.common.io.statistics.BaseStatistics; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.ResultTypeQueryable; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.api.java.typeutils.WritableTypeInfo; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.fs.FileStatus; -import org.apache.flink.core.fs.FileSystem; -import org.apache.flink.core.fs.Path; -import org.apache.flink.core.io.InputSplitAssigner; -import org.apache.flink.hadoopcompatibility.mapreduce.utils.HadoopUtils; -import org.apache.flink.hadoopcompatibility.mapreduce.wrapper.HadoopInputSplit; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.JobID; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; - -public class HadoopInputFormat<K extends Writable, V extends Writable> implements InputFormat<Tuple2<K,V>, HadoopInputSplit>, ResultTypeQueryable<Tuple2<K,V>> { - - private static final long serialVersionUID = 1L; - - private static final Logger LOG = LoggerFactory.getLogger(HadoopInputFormat.class); - - private org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat; - private Class<K> keyClass; - private Class<V> valueClass; - private org.apache.hadoop.conf.Configuration configuration; - - private transient RecordReader<K, V> recordReader; - private boolean fetched = false; - private boolean hasNext; - - public HadoopInputFormat() { - super(); - } - - public HadoopInputFormat(org.apache.hadoop.mapreduce.InputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, Job job) { - super(); - this.mapreduceInputFormat = mapreduceInputFormat; - this.keyClass = key; - this.valueClass = value; - this.configuration = job.getConfiguration(); - HadoopUtils.mergeHadoopConf(configuration); - } - - public void setConfiguration(org.apache.hadoop.conf.Configuration configuration) { - this.configuration = configuration; - } - - public org.apache.hadoop.mapreduce.InputFormat<K,V> getHadoopInputFormat() { - return this.mapreduceInputFormat; - } - - public void setHadoopInputFormat(org.apache.hadoop.mapreduce.InputFormat<K,V> mapreduceInputFormat) { - this.mapreduceInputFormat = mapreduceInputFormat; - } - - public org.apache.hadoop.conf.Configuration getConfiguration() { - return this.configuration; - } - - // -------------------------------------------------------------------------------------------- - // InputFormat - // -------------------------------------------------------------------------------------------- - - @Override - public void configure(Configuration parameters) { - // nothing to do - } - - @Override - public BaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException { - // only gather base statistics for FileInputFormats - if(!(mapreduceInputFormat instanceof FileInputFormat)) { - return null; - } - - JobContext jobContext = null; - try { - jobContext = HadoopUtils.instantiateJobContext(configuration, null); - } catch (Exception e) { - throw new RuntimeException(e); - } - - final FileBaseStatistics cachedFileStats = (cachedStats != null && cachedStats instanceof FileBaseStatistics) ? - (FileBaseStatistics) cachedStats : null; - - try { - final org.apache.hadoop.fs.Path[] paths = FileInputFormat.getInputPaths(jobContext); - return getFileStats(cachedFileStats, paths, new ArrayList<FileStatus>(1)); - } catch (IOException ioex) { - if (LOG.isWarnEnabled()) { - LOG.warn("Could not determine statistics due to an io error: " - + ioex.getMessage()); - } - } catch (Throwable t) { - if (LOG.isErrorEnabled()) { - LOG.error("Unexpected problem while getting the file statistics: " - + t.getMessage(), t); - } - } - - // no statistics available - return null; - } - - @Override - public HadoopInputSplit[] createInputSplits(int minNumSplits) - throws IOException { - configuration.setInt("mapreduce.input.fileinputformat.split.minsize", minNumSplits); - - JobContext jobContext = null; - try { - jobContext = HadoopUtils.instantiateJobContext(configuration, new JobID()); - } catch (Exception e) { - throw new RuntimeException(e); - } - - List<org.apache.hadoop.mapreduce.InputSplit> splits; - try { - splits = this.mapreduceInputFormat.getSplits(jobContext); - } catch (InterruptedException e) { - throw new IOException("Could not get Splits.", e); - } - HadoopInputSplit[] hadoopInputSplits = new HadoopInputSplit[splits.size()]; - - for(int i = 0; i < hadoopInputSplits.length; i++){ - hadoopInputSplits[i] = new HadoopInputSplit(splits.get(i), jobContext); - } - return hadoopInputSplits; - } - - @Override - public InputSplitAssigner getInputSplitAssigner(HadoopInputSplit[] inputSplits) { - return new DefaultInputSplitAssigner(inputSplits); - } - - @Override - public void open(HadoopInputSplit split) throws IOException { - TaskAttemptContext context = null; - try { - context = HadoopUtils.instantiateTaskAttemptContext(configuration, new TaskAttemptID()); - } catch(Exception e) { - throw new RuntimeException(e); - } - - try { - this.recordReader = this.mapreduceInputFormat - .createRecordReader(split.getHadoopInputSplit(), context); - this.recordReader.initialize(split.getHadoopInputSplit(), context); - } catch (InterruptedException e) { - throw new IOException("Could not create RecordReader.", e); - } finally { - this.fetched = false; - } - } - - @Override - public boolean reachedEnd() throws IOException { - if(!this.fetched) { - fetchNext(); - } - return !this.hasNext; - } - - private void fetchNext() throws IOException { - try { - this.hasNext = this.recordReader.nextKeyValue(); - } catch (InterruptedException e) { - throw new IOException("Could not fetch next KeyValue pair.", e); - } finally { - this.fetched = true; - } - } - - @Override - public Tuple2<K, V> nextRecord(Tuple2<K, V> record) throws IOException { - if(!this.fetched) { - fetchNext(); - } - if(!this.hasNext) { - return null; - } - try { - record.f0 = this.recordReader.getCurrentKey(); - record.f1 = this.recordReader.getCurrentValue(); - } catch (InterruptedException e) { - throw new IOException("Could not get KeyValue pair.", e); - } - this.fetched = false; - - return record; - } - - @Override - public void close() throws IOException { - this.recordReader.close(); - } - - // -------------------------------------------------------------------------------------------- - // Helper methods - // -------------------------------------------------------------------------------------------- - - private FileBaseStatistics getFileStats(FileBaseStatistics cachedStats, org.apache.hadoop.fs.Path[] hadoopFilePaths, - ArrayList<FileStatus> files) throws IOException { - - long latestModTime = 0L; - - // get the file info and check whether the cached statistics are still valid. - for(org.apache.hadoop.fs.Path hadoopPath : hadoopFilePaths) { - - final Path filePath = new Path(hadoopPath.toUri()); - final FileSystem fs = FileSystem.get(filePath.toUri()); - - final FileStatus file = fs.getFileStatus(filePath); - latestModTime = Math.max(latestModTime, file.getModificationTime()); - - // enumerate all files and check their modification time stamp. - if (file.isDir()) { - FileStatus[] fss = fs.listStatus(filePath); - files.ensureCapacity(files.size() + fss.length); - - for (FileStatus s : fss) { - if (!s.isDir()) { - files.add(s); - latestModTime = Math.max(s.getModificationTime(), latestModTime); - } - } - } else { - files.add(file); - } - } - - // check whether the cached statistics are still valid, if we have any - if (cachedStats != null && latestModTime <= cachedStats.getLastModificationTime()) { - return cachedStats; - } - - // calculate the whole length - long len = 0; - for (FileStatus s : files) { - len += s.getLen(); - } - - // sanity check - if (len <= 0) { - len = BaseStatistics.SIZE_UNKNOWN; - } - - return new FileBaseStatistics(latestModTime, len, BaseStatistics.AVG_RECORD_BYTES_UNKNOWN); - } - - // -------------------------------------------------------------------------------------------- - // Custom serialization methods - // -------------------------------------------------------------------------------------------- - - private void writeObject(ObjectOutputStream out) throws IOException { - out.writeUTF(this.mapreduceInputFormat.getClass().getName()); - out.writeUTF(this.keyClass.getName()); - out.writeUTF(this.valueClass.getName()); - this.configuration.write(out); - } - - @SuppressWarnings("unchecked") - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - String hadoopInputFormatClassName = in.readUTF(); - String keyClassName = in.readUTF(); - String valueClassName = in.readUTF(); - - org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration(); - configuration.readFields(in); - - if(this.configuration == null) { - this.configuration = configuration; - } - - try { - this.mapreduceInputFormat = (org.apache.hadoop.mapreduce.InputFormat<K,V>) Class.forName(hadoopInputFormatClassName, true, Thread.currentThread().getContextClassLoader()).newInstance(); - } catch (Exception e) { - throw new RuntimeException("Unable to instantiate the hadoop input format", e); - } - try { - this.keyClass = (Class<K>) Class.forName(keyClassName, true, Thread.currentThread().getContextClassLoader()); - } catch (Exception e) { - throw new RuntimeException("Unable to find key class.", e); - } - try { - this.valueClass = (Class<V>) Class.forName(valueClassName, true, Thread.currentThread().getContextClassLoader()); - } catch (Exception e) { - throw new RuntimeException("Unable to find value class.", e); - } - } - - // -------------------------------------------------------------------------------------------- - // ResultTypeQueryable - // -------------------------------------------------------------------------------------------- - - @Override - public TypeInformation<Tuple2<K,V>> getProducedType() { - return new TupleTypeInfo<Tuple2<K,V>>(new WritableTypeInfo<K>((Class<K>) keyClass), new WritableTypeInfo<V>((Class<V>) valueClass)); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java deleted file mode 100644 index 402372c..0000000 --- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java +++ /dev/null @@ -1,227 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.hadoopcompatibility.mapreduce; - -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; - -import org.apache.flink.api.common.io.FinalizeOnMaster; -import org.apache.flink.api.common.io.OutputFormat; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.hadoopcompatibility.mapreduce.utils.HadoopUtils; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.JobID; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; - - -public class HadoopOutputFormat<K extends Writable,V extends Writable> implements OutputFormat<Tuple2<K, V>>, FinalizeOnMaster { - - private static final long serialVersionUID = 1L; - - private org.apache.hadoop.conf.Configuration configuration; - private org.apache.hadoop.mapreduce.OutputFormat<K,V> mapreduceOutputFormat; - private transient RecordWriter<K,V> recordWriter; - private transient FileOutputCommitter fileOutputCommitter; - private transient TaskAttemptContext context; - private transient int taskNumber; - - public HadoopOutputFormat(org.apache.hadoop.mapreduce.OutputFormat<K,V> mapreduceOutputFormat, Job job) { - super(); - this.mapreduceOutputFormat = mapreduceOutputFormat; - this.configuration = job.getConfiguration(); - HadoopUtils.mergeHadoopConf(configuration); - } - - public void setConfiguration(org.apache.hadoop.conf.Configuration configuration) { - this.configuration = configuration; - } - - public org.apache.hadoop.conf.Configuration getConfiguration() { - return this.configuration; - } - - public org.apache.hadoop.mapreduce.OutputFormat<K,V> getHadoopOutputFormat() { - return this.mapreduceOutputFormat; - } - - public void setHadoopOutputFormat(org.apache.hadoop.mapreduce.OutputFormat<K,V> mapreduceOutputFormat) { - this.mapreduceOutputFormat = mapreduceOutputFormat; - } - - // -------------------------------------------------------------------------------------------- - // OutputFormat - // -------------------------------------------------------------------------------------------- - - @Override - public void configure(Configuration parameters) { - // nothing to do - } - - /** - * create the temporary output file for hadoop RecordWriter. - * @param taskNumber The number of the parallel instance. - * @param numTasks The number of parallel tasks. - * @throws IOException - */ - @Override - public void open(int taskNumber, int numTasks) throws IOException { - if (Integer.toString(taskNumber + 1).length() > 6) { - throw new IOException("Task id too large."); - } - - this.taskNumber = taskNumber+1; - - // for hadoop 2.2 - this.configuration.set("mapreduce.output.basename", "tmp"); - - TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_" - + String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s"," ").replace(" ", "0") - + Integer.toString(taskNumber + 1) - + "_0"); - - this.configuration.set("mapred.task.id", taskAttemptID.toString()); - this.configuration.setInt("mapred.task.partition", taskNumber + 1); - // for hadoop 2.2 - this.configuration.set("mapreduce.task.attempt.id", taskAttemptID.toString()); - this.configuration.setInt("mapreduce.task.partition", taskNumber + 1); - - try { - this.context = HadoopUtils.instantiateTaskAttemptContext(this.configuration, taskAttemptID); - } catch (Exception e) { - throw new RuntimeException(e); - } - - this.fileOutputCommitter = new FileOutputCommitter(new Path(this.configuration.get("mapred.output.dir")), context); - - try { - this.fileOutputCommitter.setupJob(HadoopUtils.instantiateJobContext(this.configuration, new JobID())); - } catch (Exception e) { - throw new RuntimeException(e); - } - - // compatible for hadoop 2.2.0, the temporary output directory is different from hadoop 1.2.1 - this.configuration.set("mapreduce.task.output.dir", this.fileOutputCommitter.getWorkPath().toString()); - - try { - this.recordWriter = this.mapreduceOutputFormat.getRecordWriter(this.context); - } catch (InterruptedException e) { - throw new IOException("Could not create RecordWriter.", e); - } - } - - - @Override - public void writeRecord(Tuple2<K, V> record) throws IOException { - try { - this.recordWriter.write(record.f0, record.f1); - } catch (InterruptedException e) { - throw new IOException("Could not write Record.", e); - } - } - - /** - * commit the task by moving the output file out from the temporary directory. - * @throws IOException - */ - @Override - public void close() throws IOException { - try { - this.recordWriter.close(this.context); - } catch (InterruptedException e) { - throw new IOException("Could not close RecordReader.", e); - } - - if (this.fileOutputCommitter.needsTaskCommit(this.context)) { - this.fileOutputCommitter.commitTask(this.context); - } - - Path outputPath = new Path(this.configuration.get("mapred.output.dir")); - - // rename tmp-file to final name - FileSystem fs = FileSystem.get(outputPath.toUri(), this.configuration); - - String taskNumberStr = Integer.toString(this.taskNumber); - String tmpFileTemplate = "tmp-r-00000"; - String tmpFile = tmpFileTemplate.substring(0,11-taskNumberStr.length())+taskNumberStr; - - if(fs.exists(new Path(outputPath.toString()+"/"+tmpFile))) { - fs.rename(new Path(outputPath.toString()+"/"+tmpFile), new Path(outputPath.toString()+"/"+taskNumberStr)); - } - } - - @Override - public void finalizeGlobal(int parallelism) throws IOException { - - JobContext jobContext; - TaskAttemptContext taskContext; - try { - - TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_" - + String.format("%" + (6 - Integer.toString(1).length()) + "s"," ").replace(" ", "0") - + Integer.toString(1) - + "_0"); - - jobContext = HadoopUtils.instantiateJobContext(this.configuration, new JobID()); - taskContext = HadoopUtils.instantiateTaskAttemptContext(this.configuration, taskAttemptID); - } catch (Exception e) { - throw new RuntimeException(e); - } - this.fileOutputCommitter = new FileOutputCommitter(new Path(this.configuration.get("mapred.output.dir")), taskContext); - - // finalize HDFS output format - this.fileOutputCommitter.commitJob(jobContext); - } - - // -------------------------------------------------------------------------------------------- - // Custom serialization methods - // -------------------------------------------------------------------------------------------- - - private void writeObject(ObjectOutputStream out) throws IOException { - out.writeUTF(this.mapreduceOutputFormat.getClass().getName()); - this.configuration.write(out); - } - - @SuppressWarnings("unchecked") - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - String hadoopOutputFormatClassName = in.readUTF(); - - org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration(); - configuration.readFields(in); - - if(this.configuration == null) { - this.configuration = configuration; - } - - try { - this.mapreduceOutputFormat = (org.apache.hadoop.mapreduce.OutputFormat<K,V>) Class.forName(hadoopOutputFormatClassName, true, Thread.currentThread().getContextClassLoader()).newInstance(); - } catch (Exception e) { - throw new RuntimeException("Unable to instantiate the hadoop output format", e); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java index 2b99fd2..f5758eb 100644 --- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java +++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java @@ -32,8 +32,8 @@ import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.hadoopcompatibility.mapreduce.HadoopInputFormat; -import org.apache.flink.hadoopcompatibility.mapreduce.HadoopOutputFormat; +import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat; +import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat; /** * Implements a word count which takes the input file and counts the number of http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/utils/HadoopUtils.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/utils/HadoopUtils.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/utils/HadoopUtils.java deleted file mode 100644 index 86b730f..0000000 --- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/utils/HadoopUtils.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.hadoopcompatibility.mapreduce.utils; - -import java.lang.reflect.Constructor; -import java.util.Map; - -import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.JobID; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; - -public class HadoopUtils { - - /** - * Merge HadoopConfiguration into Configuration. This is necessary for the HDFS configuration. - */ - public static void mergeHadoopConf(Configuration configuration) { - Configuration hadoopConf = HadoopFileSystem.getHadoopConfiguration(); - - for (Map.Entry<String, String> e : hadoopConf) { - configuration.set(e.getKey(), e.getValue()); - } - } - - public static JobContext instantiateJobContext(Configuration configuration, JobID jobId) throws Exception { - try { - Class<?> clazz; - // for Hadoop 1.xx - if(JobContext.class.isInterface()) { - clazz = Class.forName("org.apache.hadoop.mapreduce.task.JobContextImpl", true, Thread.currentThread().getContextClassLoader()); - } - // for Hadoop 2.xx - else { - clazz = Class.forName("org.apache.hadoop.mapreduce.JobContext", true, Thread.currentThread().getContextClassLoader()); - } - Constructor<?> constructor = clazz.getConstructor(Configuration.class, JobID.class); - JobContext context = (JobContext) constructor.newInstance(configuration, jobId); - - return context; - } catch(Exception e) { - throw new Exception("Could not create instance of JobContext."); - } - } - - public static TaskAttemptContext instantiateTaskAttemptContext(Configuration configuration, TaskAttemptID taskAttemptID) throws Exception { - try { - Class<?> clazz; - // for Hadoop 1.xx - if(JobContext.class.isInterface()) { - clazz = Class.forName("org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl"); - } - // for Hadoop 2.xx - else { - clazz = Class.forName("org.apache.hadoop.mapreduce.TaskAttemptContext"); - } - Constructor<?> constructor = clazz.getConstructor(Configuration.class, TaskAttemptID.class); - TaskAttemptContext context = (TaskAttemptContext) constructor.newInstance(configuration, taskAttemptID); - - return context; - } catch(Exception e) { - throw new Exception("Could not create instance of TaskAttemptContext."); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/wrapper/HadoopInputSplit.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/wrapper/HadoopInputSplit.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/wrapper/HadoopInputSplit.java deleted file mode 100644 index 25cd0d8..0000000 --- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/wrapper/HadoopInputSplit.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.hadoopcompatibility.mapreduce.wrapper; - -import java.io.IOException; - -import org.apache.flink.core.io.InputSplit; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableFactories; -import org.apache.hadoop.mapreduce.JobContext; - - -public class HadoopInputSplit implements InputSplit { - - public transient org.apache.hadoop.mapreduce.InputSplit mapreduceInputSplit; - public transient JobContext jobContext; - - private int splitNumber; - - public org.apache.hadoop.mapreduce.InputSplit getHadoopInputSplit() { - return mapreduceInputSplit; - } - - - public HadoopInputSplit() { - super(); - } - - - public HadoopInputSplit(org.apache.hadoop.mapreduce.InputSplit mapreduceInputSplit, JobContext jobContext) { - if(!(mapreduceInputSplit instanceof Writable)) { - throw new IllegalArgumentException("InputSplit must implement Writable interface."); - } - this.mapreduceInputSplit = mapreduceInputSplit; - this.jobContext = jobContext; - } - - @Override - public void write(DataOutputView out) throws IOException { - out.writeInt(this.splitNumber); - out.writeUTF(this.mapreduceInputSplit.getClass().getName()); - Writable w = (Writable) this.mapreduceInputSplit; - w.write(out); - } - - @Override - public void read(DataInputView in) throws IOException { - this.splitNumber=in.readInt(); - String className = in.readUTF(); - - if(this.mapreduceInputSplit == null) { - try { - Class<? extends org.apache.hadoop.io.Writable> inputSplit = - Class.forName(className).asSubclass(org.apache.hadoop.io.Writable.class); - this.mapreduceInputSplit = (org.apache.hadoop.mapreduce.InputSplit) WritableFactories.newInstance(inputSplit); - } catch (Exception e) { - throw new RuntimeException("Unable to create InputSplit", e); - } - } - ((Writable)this.mapreduceInputSplit).readFields(in); - } - - @Override - public int getSplitNumber() { - return this.splitNumber; - } - - public void setSplitNumber(int splitNumber) { - this.splitNumber = splitNumber; - } -}
