http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/wrapper/HadoopInputSplit.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/wrapper/HadoopInputSplit.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/wrapper/HadoopInputSplit.java new file mode 100644 index 0000000..f2758b3 --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/wrapper/HadoopInputSplit.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.api.java.hadoop.mapreduce.wrapper; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; + +import org.apache.flink.core.io.LocatableInputSplit; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableFactories; +import org.apache.hadoop.mapreduce.JobContext; + + +public class HadoopInputSplit extends LocatableInputSplit { + + public transient org.apache.hadoop.mapreduce.InputSplit mapreduceInputSplit; + public transient JobContext jobContext; + + private int splitNumber; + + public org.apache.hadoop.mapreduce.InputSplit getHadoopInputSplit() { + return mapreduceInputSplit; + } + + + public HadoopInputSplit() { + super(); + } + + + public HadoopInputSplit(int splitNumber, org.apache.hadoop.mapreduce.InputSplit mapreduceInputSplit, JobContext jobContext) { + this.splitNumber = splitNumber; + if(!(mapreduceInputSplit instanceof Writable)) { + throw new IllegalArgumentException("InputSplit must implement Writable interface."); + } + this.mapreduceInputSplit = mapreduceInputSplit; + this.jobContext = jobContext; + } + + @Override + public void write(DataOutputView out) throws IOException { + out.writeInt(this.splitNumber); + out.writeUTF(this.mapreduceInputSplit.getClass().getName()); + Writable w = (Writable) this.mapreduceInputSplit; + w.write(out); + } + + @Override + public void read(DataInputView in) throws IOException { + this.splitNumber = in.readInt(); + String className = in.readUTF(); + + if(this.mapreduceInputSplit == null) { + try { + Class<? extends org.apache.hadoop.io.Writable> inputSplit = + Class.forName(className).asSubclass(org.apache.hadoop.io.Writable.class); + this.mapreduceInputSplit = (org.apache.hadoop.mapreduce.InputSplit) WritableFactories.newInstance(inputSplit); + } catch (Exception e) { + throw new RuntimeException("Unable to create InputSplit", e); + } + } + ((Writable)this.mapreduceInputSplit).readFields(in); + } + + private void writeObject(ObjectOutputStream out) throws IOException { + out.writeInt(this.splitNumber); + out.writeUTF(this.mapreduceInputSplit.getClass().getName()); + Writable w = (Writable) this.mapreduceInputSplit; + w.write(out); + + } + + @SuppressWarnings("unchecked") + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + this.splitNumber=in.readInt(); + String className = in.readUTF(); + + if(this.mapreduceInputSplit == null) { + try { + Class<? extends org.apache.hadoop.io.Writable> inputSplit = + Class.forName(className).asSubclass(org.apache.hadoop.io.Writable.class); + this.mapreduceInputSplit = (org.apache.hadoop.mapreduce.InputSplit) WritableFactories.newInstance(inputSplit); + } catch (Exception e) { + throw new RuntimeException("Unable to create InputSplit", e); + } + } + ((Writable)this.mapreduceInputSplit).readFields(in); + } + + @Override + public int getSplitNumber() { + return this.splitNumber; + } + + @Override + public String[] getHostnames() { + try { + return this.mapreduceInputSplit.getLocations(); + } catch (IOException e) { + return new String[0]; + } catch (InterruptedException e) { + return new String[0]; + } + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatTest.java new file mode 100644 index 0000000..89aa67e --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.api.java.hadoop.mapred; + + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapred.FileInputFormat; +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.fail; + + +public class HadoopInputFormatTest { + + + public class DummyVoidKeyInputFormat<T> extends FileInputFormat<Void, T> { + + public DummyVoidKeyInputFormat() { + } + + @Override + public org.apache.hadoop.mapred.RecordReader<Void, T> getRecordReader(org.apache.hadoop.mapred.InputSplit inputSplit, JobConf jobConf, Reporter reporter) throws IOException { + return null; + } + } + + + @Test + public void checkTypeInformation() { + try { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + // Set up the Hadoop Input Format + Job job = Job.getInstance(); + HadoopInputFormat<Void, Long> hadoopInputFormat = new HadoopInputFormat<Void, Long>( new DummyVoidKeyInputFormat(), Void.class, Long.class, new JobConf()); + + TypeInformation<Tuple2<Void,Long>> tupleType = hadoopInputFormat.getProducedType(); + TypeInformation<Tuple2<Void,Long>> testTupleType = new TupleTypeInfo<Tuple2<Void,Long>>(BasicTypeInfo.VOID_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO); + + if(tupleType.isTupleType()) { + if(!((TupleTypeInfo)tupleType).equals(testTupleType)) { + fail("Tuple type information was not set correctly!"); + } + } else { + fail("Type information was not set to tuple type information!"); + } + + } + catch (Exception ex) { + fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage()); + } + + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatTest.java new file mode 100644 index 0000000..236d149 --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.api.java.hadoop.mapreduce; + + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.fail; + + + +public class HadoopInputFormatTest { + + + public class DummyVoidKeyInputFormat<T> extends FileInputFormat<Void, T> { + + public DummyVoidKeyInputFormat() { + } + + @Override + public RecordReader<Void, T> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { + return null; + } + } + + + @Test + public void checkTypeInformation() { + try { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + // Set up the Hadoop Input Format + Job job = Job.getInstance(); + HadoopInputFormat<Void, Long> hadoopInputFormat = new HadoopInputFormat<Void, Long>( new DummyVoidKeyInputFormat(), Void.class, Long.class, job); + + TypeInformation<Tuple2<Void,Long>> tupleType = hadoopInputFormat.getProducedType(); + TypeInformation<Tuple2<Void,Long>> testTupleType = new TupleTypeInfo<Tuple2<Void,Long>>(BasicTypeInfo.VOID_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO); + + if(tupleType.isTupleType()) { + if(!((TupleTypeInfo)tupleType).equals(testTupleType)) { + fail("Tuple type information was not set correctly!"); + } + } else { + fail("Type information was not set to tuple type information!"); + } + + } + catch (Exception ex) { + fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage()); + } + + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala index 6305619..e193770 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala @@ -27,16 +27,23 @@ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeinfo.BasicTypeInfo import org.apache.flink.api.java.typeutils.runtime.KryoSerializer import org.apache.flink.api.java.typeutils.{ValueTypeInfo, TupleTypeInfoBase} +import org.apache.flink.api.scala.hadoop.mapred +import org.apache.flink.api.scala.hadoop.mapreduce import org.apache.flink.api.scala.operators.ScalaCsvInputFormat import org.apache.flink.core.fs.Path -import org.apache.flink.api.java.{ExecutionEnvironment => JavaEnv, -CollectionEnvironment} +import org.apache.flink.api.java.{ExecutionEnvironment => JavaEnv, CollectionEnvironment} import org.apache.flink.api.common.io.{InputFormat, FileInputFormat} import org.apache.flink.api.java.operators.DataSource import org.apache.flink.types.StringValue import org.apache.flink.util.{NumberSequenceIterator, SplittableIterator} +import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => MapreduceFileInputFormat} +import org.apache.hadoop.mapreduce.{InputFormat => MapreduceInputFormat, Job} +import org.apache.hadoop.mapred.{FileInputFormat => MapredFileInputFormat, +InputFormat => MapredInputFormat, JobConf} +import org.apache.hadoop.fs.{Path => HadoopPath} + import scala.collection.JavaConverters._ @@ -304,6 +311,92 @@ class ExecutionEnvironment(javaEnv: JavaEnv) { } /** + * Creates a [[DataSet]] from the given [[org.apache.hadoop.mapred.FileInputFormat]]. The + * given inputName is set on the given job. + */ + def readHadoopFile[K, V]( + mapredInputFormat: MapredFileInputFormat[K, V], + key: Class[K], + value: Class[V], + inputPath: String, + job: JobConf) + (implicit tpe: TypeInformation[(K, V)]): DataSet[(K, V)] = { + val result = createHadoopInput(mapredInputFormat, key, value, job) + MapredFileInputFormat.addInputPath(job, new HadoopPath(inputPath)) + result + } + + /** + * Creates a [[DataSet]] from the given [[org.apache.hadoop.mapred.FileInputFormat]]. A + * [[org.apache.hadoop.mapred.JobConf]] with the given inputPath is created. + */ + def readHadoopFile[K, V]( + mapredInputFormat: MapredFileInputFormat[K, V], + key: Class[K], + value: Class[V], + inputPath: String) + (implicit tpe: TypeInformation[(K, V)]): DataSet[(K, V)] = { + readHadoopFile(mapredInputFormat, key, value, inputPath, new JobConf) + } + + /** + * Creates a [[DataSet]] from the given [[org.apache.hadoop.mapred.InputFormat]]. + */ + def createHadoopInput[K, V]( + mapredInputFormat: MapredInputFormat[K, V], + key: Class[K], + value: Class[V], + job: JobConf) + (implicit tpe: TypeInformation[(K, V)]): DataSet[(K, V)] = { + val hadoopInputFormat = new mapred.HadoopInputFormat[K, V](mapredInputFormat, key, value, job) + createInput(hadoopInputFormat) + } + + /** + * Creates a [[DataSet]] from the given [[org.apache.hadoop.mapreduce.lib.input.FileInputFormat]]. + * The given inputName is set on the given job. + */ + def readHadoopFile[K, V]( + mapredInputFormat: MapreduceFileInputFormat[K, V], + key: Class[K], + value: Class[V], + inputPath: String, + job: Job) + (implicit tpe: TypeInformation[(K, V)]): DataSet[(K, V)] = { + val result = createHadoopInput(mapredInputFormat, key, value, job) + MapreduceFileInputFormat.addInputPath(job, new HadoopPath(inputPath)) + result + } + + /** + * Creates a [[DataSet]] from the given + * [[org.apache.hadoop.mapreduce.lib.input.FileInputFormat]]. A + * [[org.apache.hadoop.mapreduce.Job]] with the given inputPath will be created. + */ + def readHadoopFile[K, V]( + mapredInputFormat: MapreduceFileInputFormat[K, V], + key: Class[K], + value: Class[V], + inputPath: String) + (implicit tpe: TypeInformation[(K, V)]): DataSet[Tuple2[K, V]] = { + readHadoopFile(mapredInputFormat, key, value, inputPath, Job.getInstance) + } + + /** + * Creates a [[DataSet]] from the given [[org.apache.hadoop.mapreduce.InputFormat]]. + */ + def createHadoopInput[K, V]( + mapredInputFormat: MapreduceInputFormat[K, V], + key: Class[K], + value: Class[V], + job: Job) + (implicit tpe: TypeInformation[(K, V)]): DataSet[Tuple2[K, V]] = { + val hadoopInputFormat = + new mapreduce.HadoopInputFormat[K, V](mapredInputFormat, key, value, job) + createInput(hadoopInputFormat) + } + + /** * Creates a DataSet from the given non-empty [[Seq]]. The elements need to be serializable * because the framework may move the elements into the cluster if needed. * http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopInputFormat.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopInputFormat.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopInputFormat.scala new file mode 100644 index 0000000..d03e433 --- /dev/null +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopInputFormat.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.scala.hadoop.mapred + +import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormatBase +import org.apache.hadoop.mapred.{JobConf, InputFormat} + +class HadoopInputFormat[K, V]( + mapredInputFormat: InputFormat[K, V], + keyClass: Class[K], + valueClass: Class[V], + job: JobConf) + extends HadoopInputFormatBase[K, V, (K, V)](mapredInputFormat, keyClass, valueClass, job) { + + def nextRecord(reuse: (K, V)): (K, V) = { + if (!fetched) { + fetchNext() + } + if (!hasNext) { + return null + } + fetched = false + (key, value) + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopOutputFormat.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopOutputFormat.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopOutputFormat.scala new file mode 100644 index 0000000..56b7a7f --- /dev/null +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopOutputFormat.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.scala.hadoop.mapred + +import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase +import org.apache.hadoop.mapred.{JobConf, OutputFormat} + +class HadoopOutputFormat[K, V](mapredOutputFormat: OutputFormat[K, V], job: JobConf) + extends HadoopOutputFormatBase[K, V, (K, V)](mapredOutputFormat, job) { + + def writeRecord(record: (K, V)) { + this.recordWriter.write(record._1, record._2) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopInputFormat.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopInputFormat.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopInputFormat.scala new file mode 100644 index 0000000..9c94f29 --- /dev/null +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopInputFormat.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.hadoop.mapreduce + +import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase +import org.apache.hadoop.mapreduce.{InputFormat, Job} + +class HadoopInputFormat[K, V]( + mapredInputFormat: InputFormat[K, V], + keyClass: Class[K], + valueClass: Class[V], + job: Job) + extends HadoopInputFormatBase[K, V, (K, V)](mapredInputFormat, keyClass, valueClass, job) { + + def nextRecord(reuse: (K, V)): (K, V) = { + if (!fetched) { + fetchNext() + } + if (!hasNext) { + return null + } + fetched = false + (recordReader.getCurrentKey, recordReader.getCurrentValue) + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopOutputFormat.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopOutputFormat.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopOutputFormat.scala new file mode 100644 index 0000000..b8ba3c1 --- /dev/null +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopOutputFormat.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.hadoop.mapreduce + +import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormatBase +import org.apache.hadoop.mapreduce.{Job, OutputFormat} + +class HadoopOutputFormat[K, V](mapredOutputFormat: OutputFormat[K, V], job: Job) + extends HadoopOutputFormatBase[K, V, (K, V)](mapredOutputFormat, job) { + + def writeRecord(record: (K, V)) { + this.recordWriter.write(record._1, record._2) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java deleted file mode 100644 index 326a1c4..0000000 --- a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java +++ /dev/null @@ -1,297 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.hadoopcompatibility.mapred; - -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.util.ArrayList; - -import org.apache.flink.api.common.io.LocatableInputSplitAssigner; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.flink.api.common.io.InputFormat; -import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics; -import org.apache.flink.api.common.io.statistics.BaseStatistics; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.ResultTypeQueryable; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.fs.FileStatus; -import org.apache.flink.core.fs.FileSystem; -import org.apache.flink.core.fs.Path; -import org.apache.flink.core.io.InputSplitAssigner; -import org.apache.flink.hadoopcompatibility.mapred.utils.HadoopUtils; -import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter; -import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopInputSplit; -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.util.ReflectionUtils; - -public class HadoopInputFormat<K, V> implements InputFormat<Tuple2<K,V>, HadoopInputSplit>, ResultTypeQueryable<Tuple2<K,V>> { - - private static final long serialVersionUID = 1L; - - private static final Logger LOG = LoggerFactory.getLogger(HadoopInputFormat.class); - - private org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat; - private Class<K> keyClass; - private Class<V> valueClass; - private JobConf jobConf; - - private transient K key; - private transient V value; - - private transient RecordReader<K, V> recordReader; - private transient boolean fetched = false; - private transient boolean hasNext; - - public HadoopInputFormat() { - super(); - } - - public HadoopInputFormat(org.apache.hadoop.mapred.InputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, JobConf job) { - super(); - this.mapredInputFormat = mapredInputFormat; - this.keyClass = key; - this.valueClass = value; - HadoopUtils.mergeHadoopConf(job); - this.jobConf = job; - ReflectionUtils.setConf(mapredInputFormat, jobConf); - } - - public void setJobConf(JobConf job) { - this.jobConf = job; - } - - public org.apache.hadoop.mapred.InputFormat<K,V> getHadoopInputFormat() { - return mapredInputFormat; - } - - public void setHadoopInputFormat(org.apache.hadoop.mapred.InputFormat<K,V> mapredInputFormat) { - this.mapredInputFormat = mapredInputFormat; - } - - public JobConf getJobConf() { - return jobConf; - } - - // -------------------------------------------------------------------------------------------- - // InputFormat - // -------------------------------------------------------------------------------------------- - - @Override - public void configure(Configuration parameters) { - // nothing to do - } - - @Override - public BaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException { - // only gather base statistics for FileInputFormats - if(!(mapredInputFormat instanceof FileInputFormat)) { - return null; - } - - final FileBaseStatistics cachedFileStats = (cachedStats != null && cachedStats instanceof FileBaseStatistics) ? - (FileBaseStatistics) cachedStats : null; - - try { - final org.apache.hadoop.fs.Path[] paths = FileInputFormat.getInputPaths(this.jobConf); - - return getFileStats(cachedFileStats, paths, new ArrayList<FileStatus>(1)); - } catch (IOException ioex) { - if (LOG.isWarnEnabled()) { - LOG.warn("Could not determine statistics due to an io error: " - + ioex.getMessage()); - } - } catch (Throwable t) { - if (LOG.isErrorEnabled()) { - LOG.error("Unexpected problem while getting the file statistics: " - + t.getMessage(), t); - } - } - - // no statistics available - return null; - } - - @Override - public HadoopInputSplit[] createInputSplits(int minNumSplits) - throws IOException { - org.apache.hadoop.mapred.InputSplit[] splitArray = mapredInputFormat.getSplits(jobConf, minNumSplits); - HadoopInputSplit[] hiSplit = new HadoopInputSplit[splitArray.length]; - for(int i=0;i<splitArray.length;i++){ - hiSplit[i] = new HadoopInputSplit(i, splitArray[i], jobConf); - } - return hiSplit; - } - - @Override - public InputSplitAssigner getInputSplitAssigner(HadoopInputSplit[] inputSplits) { - return new LocatableInputSplitAssigner(inputSplits); - } - - @Override - public void open(HadoopInputSplit split) throws IOException { - this.recordReader = this.mapredInputFormat.getRecordReader(split.getHadoopInputSplit(), jobConf, new HadoopDummyReporter()); - if (this.recordReader instanceof Configurable) { - ((Configurable) this.recordReader).setConf(jobConf); - } - key = this.recordReader.createKey(); - value = this.recordReader.createValue(); - this.fetched = false; - } - - @Override - public boolean reachedEnd() throws IOException { - if(!fetched) { - fetchNext(); - } - return !hasNext; - } - - private void fetchNext() throws IOException { - hasNext = this.recordReader.next(key, value); - fetched = true; - } - - @Override - public Tuple2<K, V> nextRecord(Tuple2<K, V> record) throws IOException { - if(!fetched) { - fetchNext(); - } - if(!hasNext) { - return null; - } - record.f0 = key; - record.f1 = value; - fetched = false; - return record; - } - - @Override - public void close() throws IOException { - this.recordReader.close(); - } - - // -------------------------------------------------------------------------------------------- - // Helper methods - // -------------------------------------------------------------------------------------------- - - private FileBaseStatistics getFileStats(FileBaseStatistics cachedStats, org.apache.hadoop.fs.Path[] hadoopFilePaths, - ArrayList<FileStatus> files) throws IOException { - - long latestModTime = 0L; - - // get the file info and check whether the cached statistics are still valid. - for(org.apache.hadoop.fs.Path hadoopPath : hadoopFilePaths) { - - final Path filePath = new Path(hadoopPath.toUri()); - final FileSystem fs = FileSystem.get(filePath.toUri()); - - final FileStatus file = fs.getFileStatus(filePath); - latestModTime = Math.max(latestModTime, file.getModificationTime()); - - // enumerate all files and check their modification time stamp. - if (file.isDir()) { - FileStatus[] fss = fs.listStatus(filePath); - files.ensureCapacity(files.size() + fss.length); - - for (FileStatus s : fss) { - if (!s.isDir()) { - files.add(s); - latestModTime = Math.max(s.getModificationTime(), latestModTime); - } - } - } else { - files.add(file); - } - } - - // check whether the cached statistics are still valid, if we have any - if (cachedStats != null && latestModTime <= cachedStats.getLastModificationTime()) { - return cachedStats; - } - - // calculate the whole length - long len = 0; - for (FileStatus s : files) { - len += s.getLen(); - } - - // sanity check - if (len <= 0) { - len = BaseStatistics.SIZE_UNKNOWN; - } - - return new FileBaseStatistics(latestModTime, len, BaseStatistics.AVG_RECORD_BYTES_UNKNOWN); - } - - // -------------------------------------------------------------------------------------------- - // Custom serialization methods - // -------------------------------------------------------------------------------------------- - - private void writeObject(ObjectOutputStream out) throws IOException { - out.writeUTF(mapredInputFormat.getClass().getName()); - out.writeUTF(keyClass.getName()); - out.writeUTF(valueClass.getName()); - jobConf.write(out); - } - - @SuppressWarnings("unchecked") - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - String hadoopInputFormatClassName = in.readUTF(); - String keyClassName = in.readUTF(); - String valueClassName = in.readUTF(); - if(jobConf == null) { - jobConf = new JobConf(); - } - jobConf.readFields(in); - try { - this.mapredInputFormat = (org.apache.hadoop.mapred.InputFormat<K,V>) Class.forName(hadoopInputFormatClassName, true, Thread.currentThread().getContextClassLoader()).newInstance(); - } catch (Exception e) { - throw new RuntimeException("Unable to instantiate the hadoop input format", e); - } - try { - this.keyClass = (Class<K>) Class.forName(keyClassName, true, Thread.currentThread().getContextClassLoader()); - } catch (Exception e) { - throw new RuntimeException("Unable to find key class.", e); - } - try { - this.valueClass = (Class<V>) Class.forName(valueClassName, true, Thread.currentThread().getContextClassLoader()); - } catch (Exception e) { - throw new RuntimeException("Unable to find value class.", e); - } - ReflectionUtils.setConf(mapredInputFormat, jobConf); - } - - // -------------------------------------------------------------------------------------------- - // ResultTypeQueryable - // -------------------------------------------------------------------------------------------- - - @Override - public TypeInformation<Tuple2<K,V>> getProducedType() { - return new TupleTypeInfo<Tuple2<K,V>>(TypeExtractor.createTypeInfo(keyClass), TypeExtractor.createTypeInfo(valueClass)); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java index dfe0067..4d81daf 100644 --- a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java +++ b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java @@ -30,7 +30,7 @@ import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.configuration.Configuration; -import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter; +import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter; import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector; import org.apache.flink.util.Collector; import org.apache.flink.util.InstantiationUtil; http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java deleted file mode 100644 index f3abfcd..0000000 --- a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java +++ /dev/null @@ -1,183 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.hadoopcompatibility.mapred; - -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; - -import org.apache.flink.api.common.io.FinalizeOnMaster; -import org.apache.flink.api.common.io.OutputFormat; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.hadoopcompatibility.mapred.utils.HadoopUtils; -import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyProgressable; -import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter; -import org.apache.hadoop.mapred.FileOutputCommitter; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.JobContext; -import org.apache.hadoop.mapred.JobID; -import org.apache.hadoop.mapred.RecordWriter; -import org.apache.hadoop.mapred.TaskAttemptContext; -import org.apache.hadoop.mapred.TaskAttemptID; -import org.apache.hadoop.util.ReflectionUtils; - - -public class HadoopOutputFormat<K,V> implements OutputFormat<Tuple2<K,V>>, FinalizeOnMaster { - - private static final long serialVersionUID = 1L; - - private JobConf jobConf; - private org.apache.hadoop.mapred.OutputFormat<K,V> mapredOutputFormat; - private transient RecordWriter<K,V> recordWriter; - private transient FileOutputCommitter fileOutputCommitter; - private transient TaskAttemptContext context; - private transient JobContext jobContext; - - public HadoopOutputFormat(org.apache.hadoop.mapred.OutputFormat<K,V> mapredOutputFormat, JobConf job) { - super(); - this.mapredOutputFormat = mapredOutputFormat; - HadoopUtils.mergeHadoopConf(job); - this.jobConf = job; - } - - public void setJobConf(JobConf job) { - this.jobConf = job; - } - - public JobConf getJobConf() { - return jobConf; - } - - public org.apache.hadoop.mapred.OutputFormat<K,V> getHadoopOutputFormat() { - return mapredOutputFormat; - } - - public void setHadoopOutputFormat(org.apache.hadoop.mapred.OutputFormat<K,V> mapredOutputFormat) { - this.mapredOutputFormat = mapredOutputFormat; - } - - // -------------------------------------------------------------------------------------------- - // OutputFormat - // -------------------------------------------------------------------------------------------- - - @Override - public void configure(Configuration parameters) { - // nothing to do - } - - /** - * create the temporary output file for hadoop RecordWriter. - * @param taskNumber The number of the parallel instance. - * @param numTasks The number of parallel tasks. - * @throws IOException - */ - @Override - public void open(int taskNumber, int numTasks) throws IOException { - if (Integer.toString(taskNumber + 1).length() > 6) { - throw new IOException("Task id too large."); - } - - TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_" - + String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s"," ").replace(" ", "0") - + Integer.toString(taskNumber + 1) - + "_0"); - - this.jobConf.set("mapred.task.id", taskAttemptID.toString()); - this.jobConf.setInt("mapred.task.partition", taskNumber + 1); - // for hadoop 2.2 - this.jobConf.set("mapreduce.task.attempt.id", taskAttemptID.toString()); - this.jobConf.setInt("mapreduce.task.partition", taskNumber + 1); - - try { - this.context = HadoopUtils.instantiateTaskAttemptContext(this.jobConf, taskAttemptID); - } catch (Exception e) { - throw new RuntimeException(e); - } - - this.fileOutputCommitter = new FileOutputCommitter(); - - try { - this.jobContext = HadoopUtils.instantiateJobContext(this.jobConf, new JobID()); - } catch (Exception e) { - throw new RuntimeException(e); - } - - this.fileOutputCommitter.setupJob(jobContext); - - this.recordWriter = this.mapredOutputFormat.getRecordWriter(null, this.jobConf, Integer.toString(taskNumber + 1), new HadoopDummyProgressable()); - } - - @Override - public void writeRecord(Tuple2<K, V> record) throws IOException { - this.recordWriter.write(record.f0, record.f1); - } - - /** - * commit the task by moving the output file out from the temporary directory. - * @throws IOException - */ - @Override - public void close() throws IOException { - this.recordWriter.close(new HadoopDummyReporter()); - - if (this.fileOutputCommitter.needsTaskCommit(this.context)) { - this.fileOutputCommitter.commitTask(this.context); - } - } - - @Override - public void finalizeGlobal(int parallelism) throws IOException { - - try { - JobContext jobContext = HadoopUtils.instantiateJobContext(this.jobConf, new JobID()); - FileOutputCommitter fileOutputCommitter = new FileOutputCommitter(); - - // finalize HDFS output format - fileOutputCommitter.commitJob(jobContext); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - // -------------------------------------------------------------------------------------------- - // Custom serialization methods - // -------------------------------------------------------------------------------------------- - - private void writeObject(ObjectOutputStream out) throws IOException { - out.writeUTF(mapredOutputFormat.getClass().getName()); - jobConf.write(out); - } - - @SuppressWarnings("unchecked") - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - String hadoopOutputFormatName = in.readUTF(); - if(jobConf == null) { - jobConf = new JobConf(); - } - jobConf.readFields(in); - try { - this.mapredOutputFormat = (org.apache.hadoop.mapred.OutputFormat<K,V>) Class.forName(hadoopOutputFormatName, true, Thread.currentThread().getContextClassLoader()).newInstance(); - } catch (Exception e) { - throw new RuntimeException("Unable to instantiate the hadoop output format", e); - } - ReflectionUtils.setConf(mapredOutputFormat, jobConf); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java index aa9f048..376fd70 100644 --- a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java +++ b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java @@ -30,7 +30,7 @@ import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.configuration.Configuration; -import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter; +import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter; import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector; import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator; import org.apache.flink.util.Collector; http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java index d9797c3..fd2c493 100644 --- a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java +++ b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java @@ -30,7 +30,7 @@ import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.configuration.Configuration; -import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter; +import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter; import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector; import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator; import org.apache.flink.util.Collector; http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java index de20fab..3547e47 100644 --- a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java +++ b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java @@ -24,9 +24,9 @@ import java.util.Iterator; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.hadoopcompatibility.mapred.HadoopInputFormat; +import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat; import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction; -import org.apache.flink.hadoopcompatibility.mapred.HadoopOutputFormat; +import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat; import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordInputFormat.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordInputFormat.java index 275fd4c..edcc43b 100644 --- a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordInputFormat.java +++ b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordInputFormat.java @@ -29,9 +29,9 @@ import org.apache.flink.api.common.io.statistics.BaseStatistics; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.InputSplitAssigner; import org.apache.flink.hadoopcompatibility.mapred.record.datatypes.HadoopTypeConverter; -import org.apache.flink.hadoopcompatibility.mapred.utils.HadoopUtils; -import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter; -import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopInputSplit; +import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils; +import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter; +import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopInputSplit; import org.apache.flink.types.Record; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordOutputFormat.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordOutputFormat.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordOutputFormat.java index 74118a3..e519062 100644 --- a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordOutputFormat.java +++ b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordOutputFormat.java @@ -27,9 +27,9 @@ import org.apache.flink.api.common.io.OutputFormat; import org.apache.flink.configuration.Configuration; import org.apache.flink.hadoopcompatibility.mapred.record.datatypes.HadoopFileOutputCommitter; import org.apache.flink.hadoopcompatibility.mapred.record.datatypes.FlinkTypeConverter; -import org.apache.flink.hadoopcompatibility.mapred.utils.HadoopUtils; -import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyProgressable; -import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter; +import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils; +import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyProgressable; +import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter; import org.apache.flink.types.Record; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordWriter; http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/utils/HadoopUtils.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/utils/HadoopUtils.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/utils/HadoopUtils.java deleted file mode 100644 index 2d2f518..0000000 --- a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/utils/HadoopUtils.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.hadoopcompatibility.mapred.utils; - -import java.lang.reflect.Constructor; -import java.util.Map; - -import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.JobContext; -import org.apache.hadoop.mapred.JobID; -import org.apache.hadoop.mapred.TaskAttemptContext; -import org.apache.hadoop.mapred.TaskAttemptID; - - -public class HadoopUtils { - - /** - * Merge HadoopConfiguration into JobConf. This is necessary for the HDFS configuration. - */ - public static void mergeHadoopConf(JobConf jobConf) { - org.apache.hadoop.conf.Configuration hadoopConf = HadoopFileSystem.getHadoopConfiguration(); - for (Map.Entry<String, String> e : hadoopConf) { - jobConf.set(e.getKey(), e.getValue()); - } - } - - public static JobContext instantiateJobContext(JobConf jobConf, JobID jobId) throws Exception { - try { - // for Hadoop 1.xx - Class<?> clazz = null; - if(!TaskAttemptContext.class.isInterface()) { - clazz = Class.forName("org.apache.hadoop.mapred.JobContext", true, Thread.currentThread().getContextClassLoader()); - } - // for Hadoop 2.xx - else { - clazz = Class.forName("org.apache.hadoop.mapred.JobContextImpl", true, Thread.currentThread().getContextClassLoader()); - } - Constructor<?> constructor = clazz.getDeclaredConstructor(JobConf.class, org.apache.hadoop.mapreduce.JobID.class); - // for Hadoop 1.xx - constructor.setAccessible(true); - JobContext context = (JobContext) constructor.newInstance(jobConf, jobId); - - return context; - } catch(Exception e) { - throw new Exception("Could not create instance of JobContext.", e); - } - } - - public static TaskAttemptContext instantiateTaskAttemptContext(JobConf jobConf, TaskAttemptID taskAttemptID) throws Exception { - try { - // for Hadoop 1.xx - Class<?> clazz = null; - if(!TaskAttemptContext.class.isInterface()) { - clazz = Class.forName("org.apache.hadoop.mapred.TaskAttemptContext", true, Thread.currentThread().getContextClassLoader()); - } - // for Hadoop 2.xx - else { - clazz = Class.forName("org.apache.hadoop.mapred.TaskAttemptContextImpl", true, Thread.currentThread().getContextClassLoader()); - } - Constructor<?> constructor = clazz.getDeclaredConstructor(JobConf.class, TaskAttemptID.class); - // for Hadoop 1.xx - constructor.setAccessible(true); - TaskAttemptContext context = (TaskAttemptContext) constructor.newInstance(jobConf, taskAttemptID); - return context; - } catch(Exception e) { - throw new Exception("Could not create instance of TaskAttemptContext.", e); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyProgressable.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyProgressable.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyProgressable.java deleted file mode 100644 index 483dd2f..0000000 --- a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyProgressable.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.hadoopcompatibility.mapred.wrapper; - -import org.apache.hadoop.util.Progressable; - -/** - * This is a dummy progress - * - */ -public class HadoopDummyProgressable implements Progressable { - @Override - public void progress() { - - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyReporter.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyReporter.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyReporter.java deleted file mode 100644 index 84a1e9e..0000000 --- a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyReporter.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.hadoopcompatibility.mapred.wrapper; - -import org.apache.hadoop.mapred.Counters.Counter; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.Reporter; - -/** - * This is a dummy progress monitor / reporter - * - */ -public class HadoopDummyReporter implements Reporter { - - @Override - public void progress() { - } - - @Override - public void setStatus(String status) { - - } - - @Override - public Counter getCounter(Enum<?> name) { - return null; - } - - @Override - public Counter getCounter(String group, String name) { - return null; - } - - @Override - public void incrCounter(Enum<?> key, long amount) { - - } - - @Override - public void incrCounter(String group, String counter, long amount) { - - } - - @Override - public InputSplit getInputSplit() throws UnsupportedOperationException { - return null; - } - // There should be an @Override, but some CDH4 dependency does not contain this method - public float getProgress() { - return 0; - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java deleted file mode 100644 index aa2155d..0000000 --- a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.hadoopcompatibility.mapred.wrapper; - -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; - -import org.apache.flink.core.io.LocatableInputSplit; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.io.WritableFactories; -import org.apache.hadoop.mapred.JobConf; - - -public class HadoopInputSplit extends LocatableInputSplit { - - private static final long serialVersionUID = 1L; - - - private transient org.apache.hadoop.mapred.InputSplit hadoopInputSplit; - - private JobConf jobConf; - - private int splitNumber; - private String hadoopInputSplitTypeName; - - - public org.apache.hadoop.mapred.InputSplit getHadoopInputSplit() { - return hadoopInputSplit; - } - - public HadoopInputSplit() { - super(); - } - - public HadoopInputSplit(int splitNumber, org.apache.hadoop.mapred.InputSplit hInputSplit, JobConf jobconf) { - - this.splitNumber = splitNumber; - this.hadoopInputSplit = hInputSplit; - this.hadoopInputSplitTypeName = hInputSplit.getClass().getName(); - this.jobConf = jobconf; - - } - - @Override - public void write(DataOutputView out) throws IOException { - out.writeInt(splitNumber); - out.writeUTF(hadoopInputSplitTypeName); - jobConf.write(out); - hadoopInputSplit.write(out); - } - - @Override - public void read(DataInputView in) throws IOException { - this.splitNumber = in.readInt(); - this.hadoopInputSplitTypeName = in.readUTF(); - if(hadoopInputSplit == null) { - try { - Class<? extends org.apache.hadoop.io.Writable> inputSplit = - Class.forName(hadoopInputSplitTypeName).asSubclass(org.apache.hadoop.io.Writable.class); - this.hadoopInputSplit = (org.apache.hadoop.mapred.InputSplit) WritableFactories.newInstance( inputSplit ); - } - catch (Exception e) { - throw new RuntimeException("Unable to create InputSplit", e); - } - } - jobConf = new JobConf(); - jobConf.readFields(in); - if (this.hadoopInputSplit instanceof Configurable) { - ((Configurable) this.hadoopInputSplit).setConf(this.jobConf); - } - this.hadoopInputSplit.readFields(in); - - } - - private void writeObject(ObjectOutputStream out) throws IOException { - out.writeInt(splitNumber); - out.writeUTF(hadoopInputSplitTypeName); - jobConf.write(out); - hadoopInputSplit.write(out); - - } - - @SuppressWarnings("unchecked") - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - this.splitNumber=in.readInt(); - this.hadoopInputSplitTypeName = in.readUTF(); - if(hadoopInputSplit == null) { - try { - Class<? extends org.apache.hadoop.io.Writable> inputSplit = - Class.forName(hadoopInputSplitTypeName).asSubclass(org.apache.hadoop.io.Writable.class); - this.hadoopInputSplit = (org.apache.hadoop.mapred.InputSplit) WritableFactories.newInstance( inputSplit ); - } - catch (Exception e) { - throw new RuntimeException("Unable to create InputSplit", e); - } - } - jobConf = new JobConf(); - jobConf.readFields(in); - if (this.hadoopInputSplit instanceof Configurable) { - ((Configurable) this.hadoopInputSplit).setConf(this.jobConf); - } - this.hadoopInputSplit.readFields(in); - } - - @Override - public int getSplitNumber() { - return this.splitNumber; - } - - @Override - public String[] getHostnames() { - try { - return this.hadoopInputSplit.getLocations(); - } catch(IOException ioe) { - return new String[0]; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java deleted file mode 100644 index 20006b8..0000000 --- a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java +++ /dev/null @@ -1,338 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.hadoopcompatibility.mapreduce; - -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.util.ArrayList; -import java.util.List; - -import org.apache.flink.api.common.io.LocatableInputSplitAssigner; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.flink.api.common.io.InputFormat; -import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics; -import org.apache.flink.api.common.io.statistics.BaseStatistics; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.ResultTypeQueryable; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.fs.FileStatus; -import org.apache.flink.core.fs.FileSystem; -import org.apache.flink.core.fs.Path; -import org.apache.flink.core.io.InputSplitAssigner; -import org.apache.flink.hadoopcompatibility.mapreduce.utils.HadoopUtils; -import org.apache.flink.hadoopcompatibility.mapreduce.wrapper.HadoopInputSplit; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.JobID; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.flink.api.java.typeutils.TypeExtractor; - -public class HadoopInputFormat<K, V> implements InputFormat<Tuple2<K,V>, HadoopInputSplit>, ResultTypeQueryable<Tuple2<K,V>> { - - private static final long serialVersionUID = 1L; - - private static final Logger LOG = LoggerFactory.getLogger(HadoopInputFormat.class); - - private org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat; - private Class<K> keyClass; - private Class<V> valueClass; - private org.apache.hadoop.conf.Configuration configuration; - - private transient RecordReader<K, V> recordReader; - private boolean fetched = false; - private boolean hasNext; - - public HadoopInputFormat() { - super(); - } - - public HadoopInputFormat(org.apache.hadoop.mapreduce.InputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, Job job) { - super(); - this.mapreduceInputFormat = mapreduceInputFormat; - this.keyClass = key; - this.valueClass = value; - this.configuration = job.getConfiguration(); - HadoopUtils.mergeHadoopConf(configuration); - } - - public void setConfiguration(org.apache.hadoop.conf.Configuration configuration) { - this.configuration = configuration; - } - - public org.apache.hadoop.mapreduce.InputFormat<K,V> getHadoopInputFormat() { - return this.mapreduceInputFormat; - } - - public void setHadoopInputFormat(org.apache.hadoop.mapreduce.InputFormat<K,V> mapreduceInputFormat) { - this.mapreduceInputFormat = mapreduceInputFormat; - } - - public org.apache.hadoop.conf.Configuration getConfiguration() { - return this.configuration; - } - - // -------------------------------------------------------------------------------------------- - // InputFormat - // -------------------------------------------------------------------------------------------- - - @Override - public void configure(Configuration parameters) { - // nothing to do - } - - @Override - public BaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException { - // only gather base statistics for FileInputFormats - if(!(mapreduceInputFormat instanceof FileInputFormat)) { - return null; - } - - JobContext jobContext = null; - try { - jobContext = HadoopUtils.instantiateJobContext(configuration, null); - } catch (Exception e) { - throw new RuntimeException(e); - } - - final FileBaseStatistics cachedFileStats = (cachedStats != null && cachedStats instanceof FileBaseStatistics) ? - (FileBaseStatistics) cachedStats : null; - - try { - final org.apache.hadoop.fs.Path[] paths = FileInputFormat.getInputPaths(jobContext); - return getFileStats(cachedFileStats, paths, new ArrayList<FileStatus>(1)); - } catch (IOException ioex) { - if (LOG.isWarnEnabled()) { - LOG.warn("Could not determine statistics due to an io error: " - + ioex.getMessage()); - } - } catch (Throwable t) { - if (LOG.isErrorEnabled()) { - LOG.error("Unexpected problem while getting the file statistics: " - + t.getMessage(), t); - } - } - - // no statistics available - return null; - } - - @Override - public HadoopInputSplit[] createInputSplits(int minNumSplits) - throws IOException { - configuration.setInt("mapreduce.input.fileinputformat.split.minsize", minNumSplits); - - JobContext jobContext = null; - try { - jobContext = HadoopUtils.instantiateJobContext(configuration, new JobID()); - } catch (Exception e) { - throw new RuntimeException(e); - } - - List<org.apache.hadoop.mapreduce.InputSplit> splits; - try { - splits = this.mapreduceInputFormat.getSplits(jobContext); - } catch (InterruptedException e) { - throw new IOException("Could not get Splits.", e); - } - HadoopInputSplit[] hadoopInputSplits = new HadoopInputSplit[splits.size()]; - - for(int i = 0; i < hadoopInputSplits.length; i++){ - hadoopInputSplits[i] = new HadoopInputSplit(i, splits.get(i), jobContext); - } - return hadoopInputSplits; - } - - @Override - public InputSplitAssigner getInputSplitAssigner(HadoopInputSplit[] inputSplits) { - return new LocatableInputSplitAssigner(inputSplits); - } - - @Override - public void open(HadoopInputSplit split) throws IOException { - TaskAttemptContext context = null; - try { - context = HadoopUtils.instantiateTaskAttemptContext(configuration, new TaskAttemptID()); - } catch(Exception e) { - throw new RuntimeException(e); - } - - try { - this.recordReader = this.mapreduceInputFormat - .createRecordReader(split.getHadoopInputSplit(), context); - this.recordReader.initialize(split.getHadoopInputSplit(), context); - } catch (InterruptedException e) { - throw new IOException("Could not create RecordReader.", e); - } finally { - this.fetched = false; - } - } - - @Override - public boolean reachedEnd() throws IOException { - if(!this.fetched) { - fetchNext(); - } - return !this.hasNext; - } - - private void fetchNext() throws IOException { - try { - this.hasNext = this.recordReader.nextKeyValue(); - } catch (InterruptedException e) { - throw new IOException("Could not fetch next KeyValue pair.", e); - } finally { - this.fetched = true; - } - } - - @Override - public Tuple2<K, V> nextRecord(Tuple2<K, V> record) throws IOException { - if(!this.fetched) { - fetchNext(); - } - if(!this.hasNext) { - return null; - } - try { - record.f0 = this.recordReader.getCurrentKey(); - record.f1 = this.recordReader.getCurrentValue(); - } catch (InterruptedException e) { - throw new IOException("Could not get KeyValue pair.", e); - } - this.fetched = false; - - return record; - } - - @Override - public void close() throws IOException { - this.recordReader.close(); - } - - // -------------------------------------------------------------------------------------------- - // Helper methods - // -------------------------------------------------------------------------------------------- - - private FileBaseStatistics getFileStats(FileBaseStatistics cachedStats, org.apache.hadoop.fs.Path[] hadoopFilePaths, - ArrayList<FileStatus> files) throws IOException { - - long latestModTime = 0L; - - // get the file info and check whether the cached statistics are still valid. - for(org.apache.hadoop.fs.Path hadoopPath : hadoopFilePaths) { - - final Path filePath = new Path(hadoopPath.toUri()); - final FileSystem fs = FileSystem.get(filePath.toUri()); - - final FileStatus file = fs.getFileStatus(filePath); - latestModTime = Math.max(latestModTime, file.getModificationTime()); - - // enumerate all files and check their modification time stamp. - if (file.isDir()) { - FileStatus[] fss = fs.listStatus(filePath); - files.ensureCapacity(files.size() + fss.length); - - for (FileStatus s : fss) { - if (!s.isDir()) { - files.add(s); - latestModTime = Math.max(s.getModificationTime(), latestModTime); - } - } - } else { - files.add(file); - } - } - - // check whether the cached statistics are still valid, if we have any - if (cachedStats != null && latestModTime <= cachedStats.getLastModificationTime()) { - return cachedStats; - } - - // calculate the whole length - long len = 0; - for (FileStatus s : files) { - len += s.getLen(); - } - - // sanity check - if (len <= 0) { - len = BaseStatistics.SIZE_UNKNOWN; - } - - return new FileBaseStatistics(latestModTime, len, BaseStatistics.AVG_RECORD_BYTES_UNKNOWN); - } - - // -------------------------------------------------------------------------------------------- - // Custom serialization methods - // -------------------------------------------------------------------------------------------- - - private void writeObject(ObjectOutputStream out) throws IOException { - out.writeUTF(this.mapreduceInputFormat.getClass().getName()); - out.writeUTF(this.keyClass.getName()); - out.writeUTF(this.valueClass.getName()); - this.configuration.write(out); - } - - @SuppressWarnings("unchecked") - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - String hadoopInputFormatClassName = in.readUTF(); - String keyClassName = in.readUTF(); - String valueClassName = in.readUTF(); - - org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration(); - configuration.readFields(in); - - if(this.configuration == null) { - this.configuration = configuration; - } - - try { - this.mapreduceInputFormat = (org.apache.hadoop.mapreduce.InputFormat<K,V>) Class.forName(hadoopInputFormatClassName, true, Thread.currentThread().getContextClassLoader()).newInstance(); - } catch (Exception e) { - throw new RuntimeException("Unable to instantiate the hadoop input format", e); - } - try { - this.keyClass = (Class<K>) Class.forName(keyClassName, true, Thread.currentThread().getContextClassLoader()); - } catch (Exception e) { - throw new RuntimeException("Unable to find key class.", e); - } - try { - this.valueClass = (Class<V>) Class.forName(valueClassName, true, Thread.currentThread().getContextClassLoader()); - } catch (Exception e) { - throw new RuntimeException("Unable to find value class.", e); - } - } - - // -------------------------------------------------------------------------------------------- - // ResultTypeQueryable - // -------------------------------------------------------------------------------------------- - - @Override - public TypeInformation<Tuple2<K,V>> getProducedType() { - return new TupleTypeInfo<Tuple2<K,V>>(TypeExtractor.createTypeInfo(keyClass), TypeExtractor.createTypeInfo(valueClass)); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java deleted file mode 100644 index 696e1be..0000000 --- a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java +++ /dev/null @@ -1,226 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.hadoopcompatibility.mapreduce; - -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; - -import org.apache.flink.api.common.io.FinalizeOnMaster; -import org.apache.flink.api.common.io.OutputFormat; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.hadoopcompatibility.mapreduce.utils.HadoopUtils; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.JobID; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; - - -public class HadoopOutputFormat<K,V> implements OutputFormat<Tuple2<K,V>>, FinalizeOnMaster { - - private static final long serialVersionUID = 1L; - - private org.apache.hadoop.conf.Configuration configuration; - private org.apache.hadoop.mapreduce.OutputFormat<K,V> mapreduceOutputFormat; - private transient RecordWriter<K,V> recordWriter; - private transient FileOutputCommitter fileOutputCommitter; - private transient TaskAttemptContext context; - private transient int taskNumber; - - public HadoopOutputFormat(org.apache.hadoop.mapreduce.OutputFormat<K,V> mapreduceOutputFormat, Job job) { - super(); - this.mapreduceOutputFormat = mapreduceOutputFormat; - this.configuration = job.getConfiguration(); - HadoopUtils.mergeHadoopConf(configuration); - } - - public void setConfiguration(org.apache.hadoop.conf.Configuration configuration) { - this.configuration = configuration; - } - - public org.apache.hadoop.conf.Configuration getConfiguration() { - return this.configuration; - } - - public org.apache.hadoop.mapreduce.OutputFormat<K,V> getHadoopOutputFormat() { - return this.mapreduceOutputFormat; - } - - public void setHadoopOutputFormat(org.apache.hadoop.mapreduce.OutputFormat<K,V> mapreduceOutputFormat) { - this.mapreduceOutputFormat = mapreduceOutputFormat; - } - - // -------------------------------------------------------------------------------------------- - // OutputFormat - // -------------------------------------------------------------------------------------------- - - @Override - public void configure(Configuration parameters) { - // nothing to do - } - - /** - * create the temporary output file for hadoop RecordWriter. - * @param taskNumber The number of the parallel instance. - * @param numTasks The number of parallel tasks. - * @throws IOException - */ - @Override - public void open(int taskNumber, int numTasks) throws IOException { - if (Integer.toString(taskNumber + 1).length() > 6) { - throw new IOException("Task id too large."); - } - - this.taskNumber = taskNumber+1; - - // for hadoop 2.2 - this.configuration.set("mapreduce.output.basename", "tmp"); - - TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_" - + String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s"," ").replace(" ", "0") - + Integer.toString(taskNumber + 1) - + "_0"); - - this.configuration.set("mapred.task.id", taskAttemptID.toString()); - this.configuration.setInt("mapred.task.partition", taskNumber + 1); - // for hadoop 2.2 - this.configuration.set("mapreduce.task.attempt.id", taskAttemptID.toString()); - this.configuration.setInt("mapreduce.task.partition", taskNumber + 1); - - try { - this.context = HadoopUtils.instantiateTaskAttemptContext(this.configuration, taskAttemptID); - } catch (Exception e) { - throw new RuntimeException(e); - } - - this.fileOutputCommitter = new FileOutputCommitter(new Path(this.configuration.get("mapred.output.dir")), context); - - try { - this.fileOutputCommitter.setupJob(HadoopUtils.instantiateJobContext(this.configuration, new JobID())); - } catch (Exception e) { - throw new RuntimeException(e); - } - - // compatible for hadoop 2.2.0, the temporary output directory is different from hadoop 1.2.1 - this.configuration.set("mapreduce.task.output.dir", this.fileOutputCommitter.getWorkPath().toString()); - - try { - this.recordWriter = this.mapreduceOutputFormat.getRecordWriter(this.context); - } catch (InterruptedException e) { - throw new IOException("Could not create RecordWriter.", e); - } - } - - - @Override - public void writeRecord(Tuple2<K, V> record) throws IOException { - try { - this.recordWriter.write(record.f0, record.f1); - } catch (InterruptedException e) { - throw new IOException("Could not write Record.", e); - } - } - - /** - * commit the task by moving the output file out from the temporary directory. - * @throws IOException - */ - @Override - public void close() throws IOException { - try { - this.recordWriter.close(this.context); - } catch (InterruptedException e) { - throw new IOException("Could not close RecordReader.", e); - } - - if (this.fileOutputCommitter.needsTaskCommit(this.context)) { - this.fileOutputCommitter.commitTask(this.context); - } - - Path outputPath = new Path(this.configuration.get("mapred.output.dir")); - - // rename tmp-file to final name - FileSystem fs = FileSystem.get(outputPath.toUri(), this.configuration); - - String taskNumberStr = Integer.toString(this.taskNumber); - String tmpFileTemplate = "tmp-r-00000"; - String tmpFile = tmpFileTemplate.substring(0,11-taskNumberStr.length())+taskNumberStr; - - if(fs.exists(new Path(outputPath.toString()+"/"+tmpFile))) { - fs.rename(new Path(outputPath.toString()+"/"+tmpFile), new Path(outputPath.toString()+"/"+taskNumberStr)); - } - } - - @Override - public void finalizeGlobal(int parallelism) throws IOException { - - JobContext jobContext; - TaskAttemptContext taskContext; - try { - - TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_" - + String.format("%" + (6 - Integer.toString(1).length()) + "s"," ").replace(" ", "0") - + Integer.toString(1) - + "_0"); - - jobContext = HadoopUtils.instantiateJobContext(this.configuration, new JobID()); - taskContext = HadoopUtils.instantiateTaskAttemptContext(this.configuration, taskAttemptID); - } catch (Exception e) { - throw new RuntimeException(e); - } - this.fileOutputCommitter = new FileOutputCommitter(new Path(this.configuration.get("mapred.output.dir")), taskContext); - - // finalize HDFS output format - this.fileOutputCommitter.commitJob(jobContext); - } - - // -------------------------------------------------------------------------------------------- - // Custom serialization methods - // -------------------------------------------------------------------------------------------- - - private void writeObject(ObjectOutputStream out) throws IOException { - out.writeUTF(this.mapreduceOutputFormat.getClass().getName()); - this.configuration.write(out); - } - - @SuppressWarnings("unchecked") - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - String hadoopOutputFormatClassName = in.readUTF(); - - org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration(); - configuration.readFields(in); - - if(this.configuration == null) { - this.configuration = configuration; - } - - try { - this.mapreduceOutputFormat = (org.apache.hadoop.mapreduce.OutputFormat<K,V>) Class.forName(hadoopOutputFormatClassName, true, Thread.currentThread().getContextClassLoader()).newInstance(); - } catch (Exception e) { - throw new RuntimeException("Unable to instantiate the hadoop output format", e); - } - } -}
