http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java deleted file mode 100644 index 6ef0f2e..0000000 --- a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java +++ /dev/null @@ -1,230 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.hadoopcompatibility.mapred; - -import org.apache.commons.lang.RandomStringUtils; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.hadoopcompatibility.mapred.HadoopInputFormat; -import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction; -import org.apache.flink.test.util.JavaProgramTestBase; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Mapper; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapred.SequenceFileInputFormat; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.net.URI; -import java.util.Collection; -import java.util.LinkedList; - -@RunWith(Parameterized.class) -public class HadoopIOFormatsITCase extends JavaProgramTestBase { - - private static int NUM_PROGRAMS = 2; - - private int curProgId = config.getInteger("ProgramId", -1); - private String[] resultPath; - private String[] expectedResult; - private String sequenceFileInPath; - private String sequenceFileInPathNull; - - public HadoopIOFormatsITCase(Configuration config) { - super(config); - } - - @Override - protected void preSubmit() throws Exception { - resultPath = new String[] {getTempDirPath("result0"), getTempDirPath("result1") }; - - File sequenceFile = createAndRegisterTempFile("seqFile"); - sequenceFileInPath = sequenceFile.toURI().toString(); - - // Create a sequence file - org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(); - FileSystem fs = FileSystem.get(URI.create(sequenceFile.getAbsolutePath()), conf); - Path path = new Path(sequenceFile.getAbsolutePath()); - - // ------------------ Long / Text Key Value pair: ------------ - int kvCount = 4; - - LongWritable key = new LongWritable(); - Text value = new Text(); - SequenceFile.Writer writer = null; - try { - writer = SequenceFile.createWriter( fs, conf, path, key.getClass(), value.getClass()); - for (int i = 0; i < kvCount; i ++) { - if(i == 1) { - // write key = 0 a bit more often. - for(int a = 0;a < 15; a++) { - key.set(i); - value.set(i+" - somestring"); - writer.append(key, value); - } - } - key.set(i); - value.set(i+" - somestring"); - writer.append(key, value); - } - } finally { - IOUtils.closeStream(writer); - } - - - // ------------------ Long / Text Key Value pair: ------------ - - File sequenceFileNull = createAndRegisterTempFile("seqFileNullKey"); - sequenceFileInPathNull = sequenceFileNull.toURI().toString(); - path = new Path(sequenceFileInPathNull); - - LongWritable value1 = new LongWritable(); - SequenceFile.Writer writer1 = null; - try { - writer1 = SequenceFile.createWriter( fs, conf, path, NullWritable.class, value1.getClass()); - for (int i = 0; i < kvCount; i ++) { - value1.set(i); - writer1.append(NullWritable.get(), value1); - } - } finally { - IOUtils.closeStream(writer1); - } - } - - @Override - protected void testProgram() throws Exception { - expectedResult = HadoopIOFormatPrograms.runProgram(curProgId, resultPath, sequenceFileInPath, sequenceFileInPathNull); - } - - @Override - protected void postSubmit() throws Exception { - for(int i = 0; i < resultPath.length; i++) { - compareResultsByLinesInMemory(expectedResult[i], resultPath[i]); - } - } - - @Parameters - public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException { - - LinkedList<Configuration> tConfigs = new LinkedList<Configuration>(); - - for(int i=1; i <= NUM_PROGRAMS; i++) { - Configuration config = new Configuration(); - config.setInteger("ProgramId", i); - tConfigs.add(config); - } - - return toParameterList(tConfigs); - } - - public static class HadoopIOFormatPrograms { - - public static String[] runProgram(int progId, String resultPath[], String sequenceFileInPath, String sequenceFileInPathNull) throws Exception { - - switch(progId) { - case 1: { - /** - * Test sequence file, including a key access. - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - SequenceFileInputFormat<LongWritable, Text> sfif = new SequenceFileInputFormat<LongWritable, Text>(); - JobConf hdconf = new JobConf(); - SequenceFileInputFormat.addInputPath(hdconf, new Path(sequenceFileInPath)); - HadoopInputFormat<LongWritable, Text> hif = new HadoopInputFormat<LongWritable, Text>(sfif, LongWritable.class, Text.class, hdconf); - DataSet<Tuple2<LongWritable, Text>> ds = env.createInput(hif); - DataSet<Tuple2<Long, Text>> sumed = ds.map(new MapFunction<Tuple2<LongWritable, Text>, Tuple2<Long, Text>>() { - @Override - public Tuple2<Long, Text> map(Tuple2<LongWritable, Text> value) throws Exception { - return new Tuple2<Long, Text>(value.f0.get(), value.f1); - } - }).sum(0); - sumed.writeAsText(resultPath[0]); - DataSet<String> res = ds.distinct(0).map(new MapFunction<Tuple2<LongWritable, Text>, String>() { - @Override - public String map(Tuple2<LongWritable, Text> value) throws Exception { - return value.f1 + " - " + value.f0.get(); - } - }); - res.writeAsText(resultPath[1]); - env.execute(); - - // return expected result - return new String [] {"(21,3 - somestring)", "0 - somestring - 0\n" + - "1 - somestring - 1\n" + - "2 - somestring - 2\n" + - "3 - somestring - 3\n"}; - - } - case 2: { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - SequenceFileInputFormat<NullWritable, LongWritable> sfif = new SequenceFileInputFormat<NullWritable, LongWritable>(); - JobConf hdconf = new JobConf(); - SequenceFileInputFormat.addInputPath(hdconf, new Path(sequenceFileInPathNull)); - HadoopInputFormat<NullWritable, LongWritable> hif = new HadoopInputFormat<NullWritable, LongWritable>(sfif, NullWritable.class, LongWritable.class, hdconf); - DataSet<Tuple2<NullWritable, LongWritable>> ds = env.createInput(hif); - DataSet<Tuple2<Void, Long>> res = ds.map(new MapFunction<Tuple2<NullWritable, LongWritable>, Tuple2<Void, Long>>() { - @Override - public Tuple2<Void, Long> map(Tuple2<NullWritable, LongWritable> value) throws Exception { - return new Tuple2<Void, Long>(null, value.f1.get()); - } - }); - DataSet<Tuple2<Void, Long>> res1 = res.groupBy(1).sum(1); - res1.writeAsText(resultPath[1]); - res.writeAsText(resultPath[0]); - env.execute(); - - // return expected result - return new String [] {"(null,2)\n" + - "(null,0)\n" + - "(null,1)\n" + - "(null,3)", - "(null,0)\n" + - "(null,1)\n" + - "(null,2)\n" + - "(null,3)"}; - } - default: - throw new IllegalArgumentException("Invalid program id"); - } - - } - - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-java/pom.xml ---------------------------------------------------------------------- diff --git a/flink-java/pom.xml b/flink-java/pom.xml index cc5664b..f345591 100644 --- a/flink-java/pom.xml +++ b/flink-java/pom.xml @@ -152,6 +152,12 @@ under the License. <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-core</artifactId> + <!--<exclusions>--> + <!--<exclusion>--> + <!--<groupId>*</groupId>--> + <!--<artifactId>*</artifactId>--> + <!--</exclusion>--> + <!--</exclusions>--> </dependency> </dependencies> </profile> @@ -167,6 +173,22 @@ under the License. <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> + <!--<exclusions>--> + <!--<exclusion>--> + <!--<groupId>*</groupId>--> + <!--<artifactId>*</artifactId>--> + <!--</exclusion>--> + <!--</exclusions>--> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + <!--<exclusions>--> + <!--<exclusion>--> + <!--<groupId>*</groupId>--> + <!--<artifactId>*</artifactId>--> + <!--</exclusion>--> + <!--</exclusions>--> </dependency> </dependencies> </profile> http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java index 6415570..81caa2a 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java @@ -1265,8 +1265,7 @@ public abstract class DataSet<T> { this.context.registerDataSink(sink); return sink; } - - + // -------------------------------------------------------------------------------------------- // Utilities // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java index 563787f..61a74b9 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java @@ -38,6 +38,7 @@ import org.apache.flink.api.common.io.FileInputFormat; import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat; import org.apache.flink.api.java.io.CollectionInputFormat; import org.apache.flink.api.java.io.CsvReader; import org.apache.flink.api.java.io.IteratorInputFormat; @@ -58,6 +59,8 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.types.StringValue; import org.apache.flink.util.NumberSequenceIterator; import org.apache.flink.util.SplittableIterator; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.Job; /** * The ExecutionEnviroment is the context in which a program is executed. A @@ -400,6 +403,67 @@ public abstract class ExecutionEnvironment { return new DataSource<X>(this, inputFormat, producedType, Utils.getCallLocationName()); } + + // ----------------------------------- Hadoop Input Format --------------------------------------- + + /** + * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapred.FileInputFormat}. The + * given inputName is set on the given job. + */ + public <K,V> DataSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath, JobConf job) { + DataSource<Tuple2<K, V>> result = createHadoopInput(mapredInputFormat, key, value, job); + + org.apache.hadoop.mapred.FileInputFormat.addInputPath(job, new org.apache.hadoop.fs.Path(inputPath)); + + return result; + } + + /** + * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapred.FileInputFormat}. A + * {@link org.apache.hadoop.mapred.JobConf} with the given inputPath is created. + */ + public <K,V> DataSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath) { + return readHadoopFile(mapredInputFormat, key, value, inputPath, new JobConf()); + } + + /** + * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapred.InputFormat}. + */ + public <K,V> DataSource<Tuple2<K, V>> createHadoopInput(org.apache.hadoop.mapred.InputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, JobConf job) { + HadoopInputFormat<K, V> hadoopInputFormat = new HadoopInputFormat<K, V>(mapredInputFormat, key, value, job); + + return this.createInput(hadoopInputFormat); + } + + /** + * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}. The + * given inputName is set on the given job. + */ + public <K,V> DataSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath, Job job) throws IOException { + DataSource<Tuple2<K, V>> result = createHadoopInput(mapredInputFormat, key, value, job); + + org.apache.hadoop.mapreduce.lib.input.FileInputFormat.addInputPath(job, new org.apache + .hadoop.fs.Path(inputPath)); + + return result; + } + + /** + * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}. A + * {@link org.apache.hadoop.mapreduce.Job} with the given inputPath is created. + */ + public <K,V> DataSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath) throws IOException { + return readHadoopFile(mapredInputFormat, key, value, inputPath, Job.getInstance()); + } + + /** + * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapreduce.InputFormat}. + */ + public <K,V> DataSource<Tuple2<K, V>> createHadoopInput(org.apache.hadoop.mapreduce.InputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, Job job) { + org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> hadoopInputFormat = new org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V>(mapredInputFormat, key, value, job); + + return this.createInput(hadoopInputFormat); + } // ----------------------------------- Collection --------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormat.java new file mode 100644 index 0000000..8b25249 --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormat.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.api.java.hadoop.mapred; + +import java.io.IOException; + +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.hadoop.mapred.JobConf; + +public class HadoopInputFormat<K, V> extends HadoopInputFormatBase<K, V, Tuple2<K,V>> implements ResultTypeQueryable<Tuple2<K,V>> { + + public HadoopInputFormat(org.apache.hadoop.mapred.InputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, JobConf job) { + super(mapredInputFormat, key, value, job); + } + + @Override + public Tuple2<K, V> nextRecord(Tuple2<K, V> record) throws IOException { + if(!fetched) { + fetchNext(); + } + if(!hasNext) { + return null; + } + record.f0 = key; + record.f1 = value; + fetched = false; + return record; + } + + @Override + public TypeInformation<Tuple2<K,V>> getProducedType() { + return new TupleTypeInfo<Tuple2<K,V>>(TypeExtractor.createTypeInfo(keyClass), TypeExtractor.createTypeInfo(valueClass)); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java new file mode 100644 index 0000000..40f6631 --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.api.java.hadoop.mapred; + +import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics; +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.io.LocatableInputSplitAssigner; +import org.apache.flink.api.common.io.statistics.BaseStatistics; +import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils; +import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter; +import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopInputSplit; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.io.InputSplitAssigner; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.util.ReflectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.ArrayList; + +public abstract class HadoopInputFormatBase<K, V, T> implements InputFormat<T, HadoopInputSplit> { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(HadoopInputFormatBase.class); + + private org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat; + protected Class<K> keyClass; + protected Class<V> valueClass; + private JobConf jobConf; + + protected transient K key; + protected transient V value; + + private transient RecordReader<K, V> recordReader; + protected transient boolean fetched = false; + protected transient boolean hasNext; + + public HadoopInputFormatBase(org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat, Class<K> key, Class<V> value, JobConf job) { + super(); + this.mapredInputFormat = mapredInputFormat; + this.keyClass = key; + this.valueClass = value; + HadoopUtils.mergeHadoopConf(job); + this.jobConf = job; + ReflectionUtils.setConf(mapredInputFormat, jobConf); + } + + public JobConf getJobConf() { + return jobConf; + } + + // -------------------------------------------------------------------------------------------- + // InputFormat + // -------------------------------------------------------------------------------------------- + + @Override + public void configure(Configuration parameters) { + // nothing to do + } + + @Override + public BaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException { + // only gather base statistics for FileInputFormats + if(!(mapredInputFormat instanceof FileInputFormat)) { + return null; + } + + final FileBaseStatistics cachedFileStats = (cachedStats != null && cachedStats instanceof FileBaseStatistics) ? + (FileBaseStatistics) cachedStats : null; + + try { + final org.apache.hadoop.fs.Path[] paths = FileInputFormat.getInputPaths(this.jobConf); + + return getFileStats(cachedFileStats, paths, new ArrayList<FileStatus>(1)); + } catch (IOException ioex) { + if (LOG.isWarnEnabled()) { + LOG.warn("Could not determine statistics due to an io error: " + + ioex.getMessage()); + } + } catch (Throwable t) { + if (LOG.isErrorEnabled()) { + LOG.error("Unexpected problem while getting the file statistics: " + + t.getMessage(), t); + } + } + + // no statistics available + return null; + } + + @Override + public HadoopInputSplit[] createInputSplits(int minNumSplits) + throws IOException { + org.apache.hadoop.mapred.InputSplit[] splitArray = mapredInputFormat.getSplits(jobConf, minNumSplits); + HadoopInputSplit[] hiSplit = new HadoopInputSplit[splitArray.length]; + for(int i=0;i<splitArray.length;i++){ + hiSplit[i] = new HadoopInputSplit(i, splitArray[i], jobConf); + } + return hiSplit; + } + + @Override + public InputSplitAssigner getInputSplitAssigner(HadoopInputSplit[] inputSplits) { + return new LocatableInputSplitAssigner(inputSplits); + } + + @Override + public void open(HadoopInputSplit split) throws IOException { + this.recordReader = this.mapredInputFormat.getRecordReader(split.getHadoopInputSplit(), jobConf, new HadoopDummyReporter()); + if (this.recordReader instanceof Configurable) { + ((Configurable) this.recordReader).setConf(jobConf); + } + key = this.recordReader.createKey(); + value = this.recordReader.createValue(); + this.fetched = false; + } + + @Override + public boolean reachedEnd() throws IOException { + if(!fetched) { + fetchNext(); + } + return !hasNext; + } + + protected void fetchNext() throws IOException { + hasNext = this.recordReader.next(key, value); + fetched = true; + } + + @Override + public void close() throws IOException { + this.recordReader.close(); + } + + // -------------------------------------------------------------------------------------------- + // Helper methods + // -------------------------------------------------------------------------------------------- + + private FileBaseStatistics getFileStats(FileBaseStatistics cachedStats, org.apache.hadoop.fs.Path[] hadoopFilePaths, + ArrayList<FileStatus> files) throws IOException { + + long latestModTime = 0L; + + // get the file info and check whether the cached statistics are still valid. + for(org.apache.hadoop.fs.Path hadoopPath : hadoopFilePaths) { + + final Path filePath = new Path(hadoopPath.toUri()); + final FileSystem fs = FileSystem.get(filePath.toUri()); + + final FileStatus file = fs.getFileStatus(filePath); + latestModTime = Math.max(latestModTime, file.getModificationTime()); + + // enumerate all files and check their modification time stamp. + if (file.isDir()) { + FileStatus[] fss = fs.listStatus(filePath); + files.ensureCapacity(files.size() + fss.length); + + for (FileStatus s : fss) { + if (!s.isDir()) { + files.add(s); + latestModTime = Math.max(s.getModificationTime(), latestModTime); + } + } + } else { + files.add(file); + } + } + + // check whether the cached statistics are still valid, if we have any + if (cachedStats != null && latestModTime <= cachedStats.getLastModificationTime()) { + return cachedStats; + } + + // calculate the whole length + long len = 0; + for (FileStatus s : files) { + len += s.getLen(); + } + + // sanity check + if (len <= 0) { + len = BaseStatistics.SIZE_UNKNOWN; + } + + return new FileBaseStatistics(latestModTime, len, BaseStatistics.AVG_RECORD_BYTES_UNKNOWN); + } + + // -------------------------------------------------------------------------------------------- + // Custom serialization methods + // -------------------------------------------------------------------------------------------- + + private void writeObject(ObjectOutputStream out) throws IOException { + out.writeUTF(mapredInputFormat.getClass().getName()); + out.writeUTF(keyClass.getName()); + out.writeUTF(valueClass.getName()); + jobConf.write(out); + } + + @SuppressWarnings("unchecked") + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + String hadoopInputFormatClassName = in.readUTF(); + String keyClassName = in.readUTF(); + String valueClassName = in.readUTF(); + if(jobConf == null) { + jobConf = new JobConf(); + } + jobConf.readFields(in); + try { + this.mapredInputFormat = (org.apache.hadoop.mapred.InputFormat<K,V>) Class.forName(hadoopInputFormatClassName, true, Thread.currentThread().getContextClassLoader()).newInstance(); + } catch (Exception e) { + throw new RuntimeException("Unable to instantiate the hadoop input format", e); + } + try { + this.keyClass = (Class<K>) Class.forName(keyClassName, true, Thread.currentThread().getContextClassLoader()); + } catch (Exception e) { + throw new RuntimeException("Unable to find key class.", e); + } + try { + this.valueClass = (Class<V>) Class.forName(valueClassName, true, Thread.currentThread().getContextClassLoader()); + } catch (Exception e) { + throw new RuntimeException("Unable to find value class.", e); + } + ReflectionUtils.setConf(mapredInputFormat, jobConf); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormat.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormat.java new file mode 100644 index 0000000..75623e2 --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormat.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.api.java.hadoop.mapred; + +import java.io.IOException; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.hadoop.mapred.JobConf; + +public class HadoopOutputFormat<K,V> extends HadoopOutputFormatBase<K, V, Tuple2<K, V>> { + + public HadoopOutputFormat(org.apache.hadoop.mapred.OutputFormat<K, V> mapredOutputFormat, JobConf job) { + super(mapredOutputFormat, job); + } + + @Override + public void writeRecord(Tuple2<K, V> record) throws IOException { + this.recordWriter.write(record.f0, record.f1); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java new file mode 100644 index 0000000..a59b96f --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.api.java.hadoop.mapred; + +import org.apache.flink.api.common.io.FinalizeOnMaster; +import org.apache.flink.api.common.io.OutputFormat; +import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils; +import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyProgressable; +import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter; +import org.apache.flink.configuration.Configuration; +import org.apache.hadoop.mapred.FileOutputCommitter; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobContext; +import org.apache.hadoop.mapred.JobID; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.TaskAttemptContext; +import org.apache.hadoop.mapred.TaskAttemptID; +import org.apache.hadoop.util.ReflectionUtils; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; + + +public abstract class HadoopOutputFormatBase<K, V, T> implements OutputFormat<T>, FinalizeOnMaster { + + private static final long serialVersionUID = 1L; + + private JobConf jobConf; + private org.apache.hadoop.mapred.OutputFormat<K,V> mapredOutputFormat; + protected transient RecordWriter<K,V> recordWriter; + private transient FileOutputCommitter fileOutputCommitter; + private transient TaskAttemptContext context; + private transient JobContext jobContext; + + public HadoopOutputFormatBase(org.apache.hadoop.mapred.OutputFormat<K, V> mapredOutputFormat, JobConf job) { + super(); + this.mapredOutputFormat = mapredOutputFormat; + HadoopUtils.mergeHadoopConf(job); + this.jobConf = job; + } + + public JobConf getJobConf() { + return jobConf; + } + + // -------------------------------------------------------------------------------------------- + // OutputFormat + // -------------------------------------------------------------------------------------------- + + @Override + public void configure(Configuration parameters) { + // nothing to do + } + + /** + * create the temporary output file for hadoop RecordWriter. + * @param taskNumber The number of the parallel instance. + * @param numTasks The number of parallel tasks. + * @throws java.io.IOException + */ + @Override + public void open(int taskNumber, int numTasks) throws IOException { + if (Integer.toString(taskNumber + 1).length() > 6) { + throw new IOException("Task id too large."); + } + + TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_" + + String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s"," ").replace(" ", "0") + + Integer.toString(taskNumber + 1) + + "_0"); + + this.jobConf.set("mapred.task.id", taskAttemptID.toString()); + this.jobConf.setInt("mapred.task.partition", taskNumber + 1); + // for hadoop 2.2 + this.jobConf.set("mapreduce.task.attempt.id", taskAttemptID.toString()); + this.jobConf.setInt("mapreduce.task.partition", taskNumber + 1); + + try { + this.context = HadoopUtils.instantiateTaskAttemptContext(this.jobConf, taskAttemptID); + } catch (Exception e) { + throw new RuntimeException(e); + } + + this.fileOutputCommitter = new FileOutputCommitter(); + + try { + this.jobContext = HadoopUtils.instantiateJobContext(this.jobConf, new JobID()); + } catch (Exception e) { + throw new RuntimeException(e); + } + + this.fileOutputCommitter.setupJob(jobContext); + + this.recordWriter = this.mapredOutputFormat.getRecordWriter(null, this.jobConf, Integer.toString(taskNumber + 1), new HadoopDummyProgressable()); + } + + /** + * commit the task by moving the output file out from the temporary directory. + * @throws java.io.IOException + */ + @Override + public void close() throws IOException { + this.recordWriter.close(new HadoopDummyReporter()); + + if (this.fileOutputCommitter.needsTaskCommit(this.context)) { + this.fileOutputCommitter.commitTask(this.context); + } + } + + @Override + public void finalizeGlobal(int parallelism) throws IOException { + + try { + JobContext jobContext = HadoopUtils.instantiateJobContext(this.jobConf, new JobID()); + FileOutputCommitter fileOutputCommitter = new FileOutputCommitter(); + + // finalize HDFS output format + fileOutputCommitter.commitJob(jobContext); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + // -------------------------------------------------------------------------------------------- + // Custom serialization methods + // -------------------------------------------------------------------------------------------- + + private void writeObject(ObjectOutputStream out) throws IOException { + out.writeUTF(mapredOutputFormat.getClass().getName()); + jobConf.write(out); + } + + @SuppressWarnings("unchecked") + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + String hadoopOutputFormatName = in.readUTF(); + if(jobConf == null) { + jobConf = new JobConf(); + } + jobConf.readFields(in); + try { + this.mapredOutputFormat = (org.apache.hadoop.mapred.OutputFormat<K,V>) Class.forName(hadoopOutputFormatName, true, Thread.currentThread().getContextClassLoader()).newInstance(); + } catch (Exception e) { + throw new RuntimeException("Unable to instantiate the hadoop output format", e); + } + ReflectionUtils.setConf(mapredOutputFormat, jobConf); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java new file mode 100644 index 0000000..d4dc297 --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.api.java.hadoop.mapred.utils; + +import java.io.File; +import java.lang.reflect.Constructor; +import java.util.Map; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobContext; +import org.apache.hadoop.mapred.JobID; +import org.apache.hadoop.mapred.TaskAttemptContext; +import org.apache.hadoop.mapred.TaskAttemptID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class HadoopUtils { + + private static final Logger LOG = LoggerFactory.getLogger(HadoopUtils.class); + + /** + * Merge HadoopConfiguration into JobConf. This is necessary for the HDFS configuration. + */ + public static void mergeHadoopConf(JobConf jobConf) { + org.apache.hadoop.conf.Configuration hadoopConf = getHadoopConfiguration(); + for (Map.Entry<String, String> e : hadoopConf) { + jobConf.set(e.getKey(), e.getValue()); + } + } + + public static JobContext instantiateJobContext(JobConf jobConf, JobID jobId) throws Exception { + try { + // for Hadoop 1.xx + Class<?> clazz = null; + if(!TaskAttemptContext.class.isInterface()) { + clazz = Class.forName("org.apache.hadoop.mapred.JobContext", true, Thread.currentThread().getContextClassLoader()); + } + // for Hadoop 2.xx + else { + clazz = Class.forName("org.apache.hadoop.mapred.JobContextImpl", true, Thread.currentThread().getContextClassLoader()); + } + Constructor<?> constructor = clazz.getDeclaredConstructor(JobConf.class, org.apache.hadoop.mapreduce.JobID.class); + // for Hadoop 1.xx + constructor.setAccessible(true); + JobContext context = (JobContext) constructor.newInstance(jobConf, jobId); + + return context; + } catch(Exception e) { + throw new Exception("Could not create instance of JobContext.", e); + } + } + + public static TaskAttemptContext instantiateTaskAttemptContext(JobConf jobConf, TaskAttemptID taskAttemptID) throws Exception { + try { + // for Hadoop 1.xx + Class<?> clazz = null; + if(!TaskAttemptContext.class.isInterface()) { + clazz = Class.forName("org.apache.hadoop.mapred.TaskAttemptContext", true, Thread.currentThread().getContextClassLoader()); + } + // for Hadoop 2.xx + else { + clazz = Class.forName("org.apache.hadoop.mapred.TaskAttemptContextImpl", true, Thread.currentThread().getContextClassLoader()); + } + Constructor<?> constructor = clazz.getDeclaredConstructor(JobConf.class, TaskAttemptID.class); + // for Hadoop 1.xx + constructor.setAccessible(true); + TaskAttemptContext context = (TaskAttemptContext) constructor.newInstance(jobConf, taskAttemptID); + return context; + } catch(Exception e) { + throw new Exception("Could not create instance of TaskAttemptContext.", e); + } + } + + /** + * Returns a new Hadoop Configuration object using the path to the hadoop conf configured + * in the main configuration (flink-conf.yaml). + * This method is public because its being used in the HadoopDataSource. + */ + public static org.apache.hadoop.conf.Configuration getHadoopConfiguration() { + Configuration retConf = new org.apache.hadoop.conf.Configuration(); + + // We need to load both core-site.xml and hdfs-site.xml to determine the default fs path and + // the hdfs configuration + // Try to load HDFS configuration from Hadoop's own configuration files + // 1. approach: Flink configuration + final String hdfsDefaultPath = GlobalConfiguration.getString(ConfigConstants + .HDFS_DEFAULT_CONFIG, null); + if (hdfsDefaultPath != null) { + retConf.addResource(new org.apache.hadoop.fs.Path(hdfsDefaultPath)); + } else { + LOG.debug("Cannot find hdfs-default configuration file"); + } + + final String hdfsSitePath = GlobalConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null); + if (hdfsSitePath != null) { + retConf.addResource(new org.apache.hadoop.fs.Path(hdfsSitePath)); + } else { + LOG.debug("Cannot find hdfs-site configuration file"); + } + + // 2. Approach environment variables + String[] possibleHadoopConfPaths = new String[4]; + possibleHadoopConfPaths[0] = GlobalConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null); + possibleHadoopConfPaths[1] = System.getenv("HADOOP_CONF_DIR"); + + if (System.getenv("HADOOP_HOME") != null) { + possibleHadoopConfPaths[2] = System.getenv("HADOOP_HOME")+"/conf"; + possibleHadoopConfPaths[3] = System.getenv("HADOOP_HOME")+"/etc/hadoop"; // hadoop 2.2 + } + + for (String possibleHadoopConfPath : possibleHadoopConfPaths) { + if (possibleHadoopConfPath != null) { + if (new File(possibleHadoopConfPath).exists()) { + if (new File(possibleHadoopConfPath + "/core-site.xml").exists()) { + retConf.addResource(new org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/core-site.xml")); + + if (LOG.isDebugEnabled()) { + LOG.debug("Adding " + possibleHadoopConfPath + "/core-site.xml to hadoop configuration"); + } + } + if (new File(possibleHadoopConfPath + "/hdfs-site.xml").exists()) { + retConf.addResource(new org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/hdfs-site.xml")); + + if (LOG.isDebugEnabled()) { + LOG.debug("Adding " + possibleHadoopConfPath + "/hdfs-site.xml to hadoop configuration"); + } + } + } + } + } + return retConf; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopDummyProgressable.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopDummyProgressable.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopDummyProgressable.java new file mode 100644 index 0000000..215b890 --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopDummyProgressable.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.api.java.hadoop.mapred.wrapper; + +import org.apache.hadoop.util.Progressable; + +/** + * This is a dummy progress + * + */ +public class HadoopDummyProgressable implements Progressable { + @Override + public void progress() { + + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopDummyReporter.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopDummyReporter.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopDummyReporter.java new file mode 100644 index 0000000..01104ac --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopDummyReporter.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.api.java.hadoop.mapred.wrapper; + +import org.apache.hadoop.mapred.Counters.Counter; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.Reporter; + +/** + * This is a dummy progress monitor / reporter + * + */ +public class HadoopDummyReporter implements Reporter { + + @Override + public void progress() { + } + + @Override + public void setStatus(String status) { + + } + + @Override + public Counter getCounter(Enum<?> name) { + return null; + } + + @Override + public Counter getCounter(String group, String name) { + return null; + } + + @Override + public void incrCounter(Enum<?> key, long amount) { + + } + + @Override + public void incrCounter(String group, String counter, long amount) { + + } + + @Override + public InputSplit getInputSplit() throws UnsupportedOperationException { + return null; + } + // There should be an @Override, but some CDH4 dependency does not contain this method + public float getProgress() { + return 0; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java new file mode 100644 index 0000000..beef5d7 --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.api.java.hadoop.mapred.wrapper; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; + +import org.apache.flink.core.io.LocatableInputSplit; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.io.WritableFactories; +import org.apache.hadoop.mapred.JobConf; + + +public class HadoopInputSplit extends LocatableInputSplit { + + private static final long serialVersionUID = 1L; + + + private transient org.apache.hadoop.mapred.InputSplit hadoopInputSplit; + + private JobConf jobConf; + + private int splitNumber; + private String hadoopInputSplitTypeName; + + + public org.apache.hadoop.mapred.InputSplit getHadoopInputSplit() { + return hadoopInputSplit; + } + + public HadoopInputSplit() { + super(); + } + + public HadoopInputSplit(int splitNumber, org.apache.hadoop.mapred.InputSplit hInputSplit, JobConf jobconf) { + + this.splitNumber = splitNumber; + this.hadoopInputSplit = hInputSplit; + this.hadoopInputSplitTypeName = hInputSplit.getClass().getName(); + this.jobConf = jobconf; + + } + + @Override + public void write(DataOutputView out) throws IOException { + out.writeInt(splitNumber); + out.writeUTF(hadoopInputSplitTypeName); + jobConf.write(out); + hadoopInputSplit.write(out); + } + + @Override + public void read(DataInputView in) throws IOException { + this.splitNumber = in.readInt(); + this.hadoopInputSplitTypeName = in.readUTF(); + if(hadoopInputSplit == null) { + try { + Class<? extends org.apache.hadoop.io.Writable> inputSplit = + Class.forName(hadoopInputSplitTypeName).asSubclass(org.apache.hadoop.io.Writable.class); + this.hadoopInputSplit = (org.apache.hadoop.mapred.InputSplit) WritableFactories.newInstance( inputSplit ); + } + catch (Exception e) { + throw new RuntimeException("Unable to create InputSplit", e); + } + } + jobConf = new JobConf(); + jobConf.readFields(in); + if (this.hadoopInputSplit instanceof Configurable) { + ((Configurable) this.hadoopInputSplit).setConf(this.jobConf); + } + this.hadoopInputSplit.readFields(in); + + } + + private void writeObject(ObjectOutputStream out) throws IOException { + out.writeInt(splitNumber); + out.writeUTF(hadoopInputSplitTypeName); + jobConf.write(out); + hadoopInputSplit.write(out); + + } + + @SuppressWarnings("unchecked") + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + this.splitNumber=in.readInt(); + this.hadoopInputSplitTypeName = in.readUTF(); + if(hadoopInputSplit == null) { + try { + Class<? extends org.apache.hadoop.io.Writable> inputSplit = + Class.forName(hadoopInputSplitTypeName).asSubclass(org.apache.hadoop.io.Writable.class); + this.hadoopInputSplit = (org.apache.hadoop.mapred.InputSplit) WritableFactories.newInstance( inputSplit ); + } + catch (Exception e) { + throw new RuntimeException("Unable to create InputSplit", e); + } + } + jobConf = new JobConf(); + jobConf.readFields(in); + if (this.hadoopInputSplit instanceof Configurable) { + ((Configurable) this.hadoopInputSplit).setConf(this.jobConf); + } + this.hadoopInputSplit.readFields(in); + } + + @Override + public int getSplitNumber() { + return this.splitNumber; + } + + @Override + public String[] getHostnames() { + try { + return this.hadoopInputSplit.getLocations(); + } catch(IOException ioe) { + return new String[0]; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormat.java new file mode 100644 index 0000000..efe97f1 --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormat.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.api.java.hadoop.mapreduce; + +import java.io.IOException; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.hadoop.mapreduce.Job; +import org.apache.flink.api.java.typeutils.TypeExtractor; + +public class HadoopInputFormat<K, V> extends HadoopInputFormatBase<K, V, Tuple2<K, V>> implements ResultTypeQueryable<Tuple2<K,V>> { + + public HadoopInputFormat(org.apache.hadoop.mapreduce.InputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, Job job) { + super(mapreduceInputFormat, key, value, job); + } + + @Override + public Tuple2<K, V> nextRecord(Tuple2<K, V> record) throws IOException { + if(!this.fetched) { + fetchNext(); + } + if(!this.hasNext) { + return null; + } + try { + record.f0 = recordReader.getCurrentKey(); + record.f1 = recordReader.getCurrentValue(); + } catch (InterruptedException e) { + throw new IOException("Could not get KeyValue pair.", e); + } + this.fetched = false; + + return record; + } + + @Override + public TypeInformation<Tuple2<K,V>> getProducedType() { + return new TupleTypeInfo<Tuple2<K,V>>(TypeExtractor.createTypeInfo(keyClass), TypeExtractor.createTypeInfo(valueClass)); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java new file mode 100644 index 0000000..2a6c0f4 --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.api.java.hadoop.mapreduce; + +import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics; +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.io.LocatableInputSplitAssigner; +import org.apache.flink.api.common.io.statistics.BaseStatistics; +import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils; +import org.apache.flink.api.java.hadoop.mapreduce.wrapper.HadoopInputSplit; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.io.InputSplitAssigner; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.ArrayList; +import java.util.List; + +public abstract class HadoopInputFormatBase<K, V, T> implements InputFormat<T, HadoopInputSplit> { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(HadoopInputFormatBase.class); + + private org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat; + protected Class<K> keyClass; + protected Class<V> valueClass; + private org.apache.hadoop.conf.Configuration configuration; + + protected transient RecordReader<K, V> recordReader; + protected boolean fetched = false; + protected boolean hasNext; + + public HadoopInputFormatBase(org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat, Class<K> key, Class<V> value, Job job) { + super(); + this.mapreduceInputFormat = mapreduceInputFormat; + this.keyClass = key; + this.valueClass = value; + this.configuration = job.getConfiguration(); + HadoopUtils.mergeHadoopConf(configuration); + } + + public org.apache.hadoop.conf.Configuration getConfiguration() { + return this.configuration; + } + + // -------------------------------------------------------------------------------------------- + // InputFormat + // -------------------------------------------------------------------------------------------- + + @Override + public void configure(Configuration parameters) { + // nothing to do + } + + @Override + public BaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException { + // only gather base statistics for FileInputFormats + if(!(mapreduceInputFormat instanceof FileInputFormat)) { + return null; + } + + JobContext jobContext = null; + try { + jobContext = HadoopUtils.instantiateJobContext(configuration, null); + } catch (Exception e) { + throw new RuntimeException(e); + } + + final FileBaseStatistics cachedFileStats = (cachedStats != null && cachedStats instanceof FileBaseStatistics) ? + (FileBaseStatistics) cachedStats : null; + + try { + final org.apache.hadoop.fs.Path[] paths = FileInputFormat.getInputPaths(jobContext); + return getFileStats(cachedFileStats, paths, new ArrayList<FileStatus>(1)); + } catch (IOException ioex) { + if (LOG.isWarnEnabled()) { + LOG.warn("Could not determine statistics due to an io error: " + + ioex.getMessage()); + } + } catch (Throwable t) { + if (LOG.isErrorEnabled()) { + LOG.error("Unexpected problem while getting the file statistics: " + + t.getMessage(), t); + } + } + + // no statistics available + return null; + } + + @Override + public HadoopInputSplit[] createInputSplits(int minNumSplits) + throws IOException { + configuration.setInt("mapreduce.input.fileinputformat.split.minsize", minNumSplits); + + JobContext jobContext = null; + try { + jobContext = HadoopUtils.instantiateJobContext(configuration, new JobID()); + } catch (Exception e) { + throw new RuntimeException(e); + } + + List<org.apache.hadoop.mapreduce.InputSplit> splits; + try { + splits = this.mapreduceInputFormat.getSplits(jobContext); + } catch (InterruptedException e) { + throw new IOException("Could not get Splits.", e); + } + HadoopInputSplit[] hadoopInputSplits = new HadoopInputSplit[splits.size()]; + + for(int i = 0; i < hadoopInputSplits.length; i++){ + hadoopInputSplits[i] = new HadoopInputSplit(i, splits.get(i), jobContext); + } + return hadoopInputSplits; + } + + @Override + public InputSplitAssigner getInputSplitAssigner(HadoopInputSplit[] inputSplits) { + return new LocatableInputSplitAssigner(inputSplits); + } + + @Override + public void open(HadoopInputSplit split) throws IOException { + TaskAttemptContext context = null; + try { + context = HadoopUtils.instantiateTaskAttemptContext(configuration, new TaskAttemptID()); + } catch(Exception e) { + throw new RuntimeException(e); + } + + try { + this.recordReader = this.mapreduceInputFormat + .createRecordReader(split.getHadoopInputSplit(), context); + this.recordReader.initialize(split.getHadoopInputSplit(), context); + } catch (InterruptedException e) { + throw new IOException("Could not create RecordReader.", e); + } finally { + this.fetched = false; + } + } + + @Override + public boolean reachedEnd() throws IOException { + if(!this.fetched) { + fetchNext(); + } + return !this.hasNext; + } + + protected void fetchNext() throws IOException { + try { + this.hasNext = this.recordReader.nextKeyValue(); + } catch (InterruptedException e) { + throw new IOException("Could not fetch next KeyValue pair.", e); + } finally { + this.fetched = true; + } + } + + @Override + public void close() throws IOException { + this.recordReader.close(); + } + + // -------------------------------------------------------------------------------------------- + // Helper methods + // -------------------------------------------------------------------------------------------- + + private FileBaseStatistics getFileStats(FileBaseStatistics cachedStats, org.apache.hadoop.fs.Path[] hadoopFilePaths, + ArrayList<FileStatus> files) throws IOException { + + long latestModTime = 0L; + + // get the file info and check whether the cached statistics are still valid. + for(org.apache.hadoop.fs.Path hadoopPath : hadoopFilePaths) { + + final Path filePath = new Path(hadoopPath.toUri()); + final FileSystem fs = FileSystem.get(filePath.toUri()); + + final FileStatus file = fs.getFileStatus(filePath); + latestModTime = Math.max(latestModTime, file.getModificationTime()); + + // enumerate all files and check their modification time stamp. + if (file.isDir()) { + FileStatus[] fss = fs.listStatus(filePath); + files.ensureCapacity(files.size() + fss.length); + + for (FileStatus s : fss) { + if (!s.isDir()) { + files.add(s); + latestModTime = Math.max(s.getModificationTime(), latestModTime); + } + } + } else { + files.add(file); + } + } + + // check whether the cached statistics are still valid, if we have any + if (cachedStats != null && latestModTime <= cachedStats.getLastModificationTime()) { + return cachedStats; + } + + // calculate the whole length + long len = 0; + for (FileStatus s : files) { + len += s.getLen(); + } + + // sanity check + if (len <= 0) { + len = BaseStatistics.SIZE_UNKNOWN; + } + + return new FileBaseStatistics(latestModTime, len, BaseStatistics.AVG_RECORD_BYTES_UNKNOWN); + } + + // -------------------------------------------------------------------------------------------- + // Custom serialization methods + // -------------------------------------------------------------------------------------------- + + private void writeObject(ObjectOutputStream out) throws IOException { + out.writeUTF(this.mapreduceInputFormat.getClass().getName()); + out.writeUTF(this.keyClass.getName()); + out.writeUTF(this.valueClass.getName()); + this.configuration.write(out); + } + + @SuppressWarnings("unchecked") + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + String hadoopInputFormatClassName = in.readUTF(); + String keyClassName = in.readUTF(); + String valueClassName = in.readUTF(); + + org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration(); + configuration.readFields(in); + + if(this.configuration == null) { + this.configuration = configuration; + } + + try { + this.mapreduceInputFormat = (org.apache.hadoop.mapreduce.InputFormat<K,V>) Class.forName(hadoopInputFormatClassName, true, Thread.currentThread().getContextClassLoader()).newInstance(); + } catch (Exception e) { + throw new RuntimeException("Unable to instantiate the hadoop input format", e); + } + try { + this.keyClass = (Class<K>) Class.forName(keyClassName, true, Thread.currentThread().getContextClassLoader()); + } catch (Exception e) { + throw new RuntimeException("Unable to find key class.", e); + } + try { + this.valueClass = (Class<V>) Class.forName(valueClassName, true, Thread.currentThread().getContextClassLoader()); + } catch (Exception e) { + throw new RuntimeException("Unable to find value class.", e); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormat.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormat.java new file mode 100644 index 0000000..7d3675c --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormat.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.api.java.hadoop.mapreduce; + +import java.io.IOException; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.hadoop.mapreduce.Job; + +public class HadoopOutputFormat<K, V> extends HadoopOutputFormatBase<K, V, Tuple2<K, V>> { + + public HadoopOutputFormat(org.apache.hadoop.mapreduce.OutputFormat<K,V> mapreduceOutputFormat, Job job) { + super(mapreduceOutputFormat, job); + } + + @Override + public void writeRecord(Tuple2<K, V> record) throws IOException { + try { + this.recordWriter.write(record.f0, record.f1); + } catch (InterruptedException e) { + throw new IOException("Could not write Record.", e); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java new file mode 100644 index 0000000..a7ae428 --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.api.java.hadoop.mapreduce; + +import org.apache.flink.api.common.io.FinalizeOnMaster; +import org.apache.flink.api.common.io.OutputFormat; +import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils; +import org.apache.flink.configuration.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; + + +public abstract class HadoopOutputFormatBase<K, V, T> implements OutputFormat<T>, FinalizeOnMaster { + + private static final long serialVersionUID = 1L; + + private org.apache.hadoop.conf.Configuration configuration; + private org.apache.hadoop.mapreduce.OutputFormat<K,V> mapreduceOutputFormat; + protected transient RecordWriter<K,V> recordWriter; + private transient FileOutputCommitter fileOutputCommitter; + private transient TaskAttemptContext context; + private transient int taskNumber; + + public HadoopOutputFormatBase(org.apache.hadoop.mapreduce.OutputFormat<K, V> mapreduceOutputFormat, Job job) { + super(); + this.mapreduceOutputFormat = mapreduceOutputFormat; + this.configuration = job.getConfiguration(); + HadoopUtils.mergeHadoopConf(configuration); + } + + public org.apache.hadoop.conf.Configuration getConfiguration() { + return this.configuration; + } + + // -------------------------------------------------------------------------------------------- + // OutputFormat + // -------------------------------------------------------------------------------------------- + + @Override + public void configure(Configuration parameters) { + // nothing to do + } + + /** + * create the temporary output file for hadoop RecordWriter. + * @param taskNumber The number of the parallel instance. + * @param numTasks The number of parallel tasks. + * @throws java.io.IOException + */ + @Override + public void open(int taskNumber, int numTasks) throws IOException { + if (Integer.toString(taskNumber + 1).length() > 6) { + throw new IOException("Task id too large."); + } + + this.taskNumber = taskNumber+1; + + // for hadoop 2.2 + this.configuration.set("mapreduce.output.basename", "tmp"); + + TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_" + + String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s"," ").replace(" ", "0") + + Integer.toString(taskNumber + 1) + + "_0"); + + this.configuration.set("mapred.task.id", taskAttemptID.toString()); + this.configuration.setInt("mapred.task.partition", taskNumber + 1); + // for hadoop 2.2 + this.configuration.set("mapreduce.task.attempt.id", taskAttemptID.toString()); + this.configuration.setInt("mapreduce.task.partition", taskNumber + 1); + + try { + this.context = HadoopUtils.instantiateTaskAttemptContext(this.configuration, taskAttemptID); + } catch (Exception e) { + throw new RuntimeException(e); + } + + this.fileOutputCommitter = new FileOutputCommitter(new Path(this.configuration.get("mapred.output.dir")), context); + + try { + this.fileOutputCommitter.setupJob(HadoopUtils.instantiateJobContext(this.configuration, new JobID())); + } catch (Exception e) { + throw new RuntimeException(e); + } + + // compatible for hadoop 2.2.0, the temporary output directory is different from hadoop 1.2.1 + this.configuration.set("mapreduce.task.output.dir", this.fileOutputCommitter.getWorkPath().toString()); + + try { + this.recordWriter = this.mapreduceOutputFormat.getRecordWriter(this.context); + } catch (InterruptedException e) { + throw new IOException("Could not create RecordWriter.", e); + } + } + + /** + * commit the task by moving the output file out from the temporary directory. + * @throws java.io.IOException + */ + @Override + public void close() throws IOException { + try { + this.recordWriter.close(this.context); + } catch (InterruptedException e) { + throw new IOException("Could not close RecordReader.", e); + } + + if (this.fileOutputCommitter.needsTaskCommit(this.context)) { + this.fileOutputCommitter.commitTask(this.context); + } + + Path outputPath = new Path(this.configuration.get("mapred.output.dir")); + + // rename tmp-file to final name + FileSystem fs = FileSystem.get(outputPath.toUri(), this.configuration); + + String taskNumberStr = Integer.toString(this.taskNumber); + String tmpFileTemplate = "tmp-r-00000"; + String tmpFile = tmpFileTemplate.substring(0,11-taskNumberStr.length())+taskNumberStr; + + if(fs.exists(new Path(outputPath.toString()+"/"+tmpFile))) { + fs.rename(new Path(outputPath.toString()+"/"+tmpFile), new Path(outputPath.toString()+"/"+taskNumberStr)); + } + } + + @Override + public void finalizeGlobal(int parallelism) throws IOException { + + JobContext jobContext; + TaskAttemptContext taskContext; + try { + + TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_" + + String.format("%" + (6 - Integer.toString(1).length()) + "s"," ").replace(" ", "0") + + Integer.toString(1) + + "_0"); + + jobContext = HadoopUtils.instantiateJobContext(this.configuration, new JobID()); + taskContext = HadoopUtils.instantiateTaskAttemptContext(this.configuration, taskAttemptID); + } catch (Exception e) { + throw new RuntimeException(e); + } + this.fileOutputCommitter = new FileOutputCommitter(new Path(this.configuration.get("mapred.output.dir")), taskContext); + + // finalize HDFS output format + this.fileOutputCommitter.commitJob(jobContext); + } + + // -------------------------------------------------------------------------------------------- + // Custom serialization methods + // -------------------------------------------------------------------------------------------- + + private void writeObject(ObjectOutputStream out) throws IOException { + out.writeUTF(this.mapreduceOutputFormat.getClass().getName()); + this.configuration.write(out); + } + + @SuppressWarnings("unchecked") + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + String hadoopOutputFormatClassName = in.readUTF(); + + org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration(); + configuration.readFields(in); + + if(this.configuration == null) { + this.configuration = configuration; + } + + try { + this.mapreduceOutputFormat = (org.apache.hadoop.mapreduce.OutputFormat<K,V>) Class.forName(hadoopOutputFormatClassName, true, Thread.currentThread().getContextClassLoader()).newInstance(); + } catch (Exception e) { + throw new RuntimeException("Unable to instantiate the hadoop output format", e); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java new file mode 100644 index 0000000..fe8f8cc --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.hadoop.mapreduce.utils; + +import java.lang.reflect.Constructor; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; + +public class HadoopUtils { + + /** + * Merge HadoopConfiguration into Configuration. This is necessary for the HDFS configuration. + */ + public static void mergeHadoopConf(Configuration configuration) { + Configuration hadoopConf = org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils.getHadoopConfiguration(); + + for (Map.Entry<String, String> e : hadoopConf) { + configuration.set(e.getKey(), e.getValue()); + } + } + + public static JobContext instantiateJobContext(Configuration configuration, JobID jobId) throws Exception { + try { + Class<?> clazz; + // for Hadoop 1.xx + if(JobContext.class.isInterface()) { + clazz = Class.forName("org.apache.hadoop.mapreduce.task.JobContextImpl", true, Thread.currentThread().getContextClassLoader()); + } + // for Hadoop 2.xx + else { + clazz = Class.forName("org.apache.hadoop.mapreduce.JobContext", true, Thread.currentThread().getContextClassLoader()); + } + Constructor<?> constructor = clazz.getConstructor(Configuration.class, JobID.class); + JobContext context = (JobContext) constructor.newInstance(configuration, jobId); + + return context; + } catch(Exception e) { + throw new Exception("Could not create instance of JobContext."); + } + } + + public static TaskAttemptContext instantiateTaskAttemptContext(Configuration configuration, TaskAttemptID taskAttemptID) throws Exception { + try { + Class<?> clazz; + // for Hadoop 1.xx + if(JobContext.class.isInterface()) { + clazz = Class.forName("org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl"); + } + // for Hadoop 2.xx + else { + clazz = Class.forName("org.apache.hadoop.mapreduce.TaskAttemptContext"); + } + Constructor<?> constructor = clazz.getConstructor(Configuration.class, TaskAttemptID.class); + TaskAttemptContext context = (TaskAttemptContext) constructor.newInstance(configuration, taskAttemptID); + + return context; + } catch(Exception e) { + throw new Exception("Could not create instance of TaskAttemptContext."); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/wrapper/HadoopInputSplit.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/wrapper/HadoopInputSplit.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/wrapper/HadoopInputSplit.java new file mode 100644 index 0000000..f2758b3 --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/wrapper/HadoopInputSplit.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.api.java.hadoop.mapreduce.wrapper; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; + +import org.apache.flink.core.io.LocatableInputSplit; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableFactories; +import org.apache.hadoop.mapreduce.JobContext; + + +public class HadoopInputSplit extends LocatableInputSplit { + + public transient org.apache.hadoop.mapreduce.InputSplit mapreduceInputSplit; + public transient JobContext jobContext; + + private int splitNumber; + + public org.apache.hadoop.mapreduce.InputSplit getHadoopInputSplit() { + return mapreduceInputSplit; + } + + + public HadoopInputSplit() { + super(); + } + + + public HadoopInputSplit(int splitNumber, org.apache.hadoop.mapreduce.InputSplit mapreduceInputSplit, JobContext jobContext) { + this.splitNumber = splitNumber; + if(!(mapreduceInputSplit instanceof Writable)) { + throw new IllegalArgumentException("InputSplit must implement Writable interface."); + } + this.mapreduceInputSplit = mapreduceInputSplit; + this.jobContext = jobContext; + } + + @Override + public void write(DataOutputView out) throws IOException { + out.writeInt(this.splitNumber); + out.writeUTF(this.mapreduceInputSplit.getClass().getName()); + Writable w = (Writable) this.mapreduceInputSplit; + w.write(out); + } + + @Override + public void read(DataInputView in) throws IOException { + this.splitNumber = in.readInt(); + String className = in.readUTF(); + + if(this.mapreduceInputSplit == null) { + try { + Class<? extends org.apache.hadoop.io.Writable> inputSplit = + Class.forName(className).asSubclass(org.apache.hadoop.io.Writable.class); + this.mapreduceInputSplit = (org.apache.hadoop.mapreduce.InputSplit) WritableFactories.newInstance(inputSplit); + } catch (Exception e) { + throw new RuntimeException("Unable to create InputSplit", e); + } + } + ((Writable)this.mapreduceInputSplit).readFields(in); + } + + private void writeObject(ObjectOutputStream out) throws IOException { + out.writeInt(this.splitNumber); + out.writeUTF(this.mapreduceInputSplit.getClass().getName()); + Writable w = (Writable) this.mapreduceInputSplit; + w.write(out); + + } + + @SuppressWarnings("unchecked") + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + this.splitNumber=in.readInt(); + String className = in.readUTF(); + + if(this.mapreduceInputSplit == null) { + try { + Class<? extends org.apache.hadoop.io.Writable> inputSplit = + Class.forName(className).asSubclass(org.apache.hadoop.io.Writable.class); + this.mapreduceInputSplit = (org.apache.hadoop.mapreduce.InputSplit) WritableFactories.newInstance(inputSplit); + } catch (Exception e) { + throw new RuntimeException("Unable to create InputSplit", e); + } + } + ((Writable)this.mapreduceInputSplit).readFields(in); + } + + @Override + public int getSplitNumber() { + return this.splitNumber; + } + + @Override + public String[] getHostnames() { + try { + return this.mapreduceInputSplit.getLocations(); + } catch (IOException e) { + return new String[0]; + } catch (InterruptedException e) { + return new String[0]; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatTest.java new file mode 100644 index 0000000..89aa67e --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.api.java.hadoop.mapred; + + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapred.FileInputFormat; +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.fail; + + +public class HadoopInputFormatTest { + + + public class DummyVoidKeyInputFormat<T> extends FileInputFormat<Void, T> { + + public DummyVoidKeyInputFormat() { + } + + @Override + public org.apache.hadoop.mapred.RecordReader<Void, T> getRecordReader(org.apache.hadoop.mapred.InputSplit inputSplit, JobConf jobConf, Reporter reporter) throws IOException { + return null; + } + } + + + @Test + public void checkTypeInformation() { + try { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + // Set up the Hadoop Input Format + Job job = Job.getInstance(); + HadoopInputFormat<Void, Long> hadoopInputFormat = new HadoopInputFormat<Void, Long>( new DummyVoidKeyInputFormat(), Void.class, Long.class, new JobConf()); + + TypeInformation<Tuple2<Void,Long>> tupleType = hadoopInputFormat.getProducedType(); + TypeInformation<Tuple2<Void,Long>> testTupleType = new TupleTypeInfo<Tuple2<Void,Long>>(BasicTypeInfo.VOID_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO); + + if(tupleType.isTupleType()) { + if(!((TupleTypeInfo)tupleType).equals(testTupleType)) { + fail("Tuple type information was not set correctly!"); + } + } else { + fail("Type information was not set to tuple type information!"); + } + + } + catch (Exception ex) { + fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage()); + } + + } + +}
