http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java new file mode 100644 index 0000000..3a95d94 --- /dev/null +++ b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import com.esotericsoftware.kryo.Kryo; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.types.NormalizableKey; +import org.apache.flink.util.InstantiationUtil; +import org.apache.hadoop.io.Writable; +import org.objenesis.strategy.StdInstantiatorStrategy; + +import java.io.IOException; + +public class WritableComparator<T extends Writable & Comparable<T>> extends TypeComparator<T> { + + private static final long serialVersionUID = 1L; + + private Class<T> type; + + private final boolean ascendingComparison; + + private transient T reference; + + private transient T tempReference; + + private transient Kryo kryo; + + @SuppressWarnings("rawtypes") + private final TypeComparator[] comparators = new TypeComparator[] {this}; + + public WritableComparator(boolean ascending, Class<T> type) { + this.type = type; + this.ascendingComparison = ascending; + } + + @Override + public int hash(T record) { + return record.hashCode(); + } + + @Override + public void setReference(T toCompare) { + checkKryoInitialized(); + + reference = KryoUtils.copy(toCompare, kryo, new WritableSerializer<T>(type)); + } + + @Override + public boolean equalToReference(T candidate) { + return candidate.equals(reference); + } + + @Override + public int compareToReference(TypeComparator<T> referencedComparator) { + T otherRef = ((WritableComparator<T>) referencedComparator).reference; + int comp = otherRef.compareTo(reference); + return ascendingComparison ? comp : -comp; + } + + @Override + public int compare(T first, T second) { + int comp = first.compareTo(second); + return ascendingComparison ? comp : -comp; + } + + @Override + public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException { + ensureReferenceInstantiated(); + ensureTempReferenceInstantiated(); + + reference.readFields(firstSource); + tempReference.readFields(secondSource); + + int comp = reference.compareTo(tempReference); + return ascendingComparison ? comp : -comp; + } + + @Override + public boolean supportsNormalizedKey() { + return NormalizableKey.class.isAssignableFrom(type); + } + + @Override + public int getNormalizeKeyLen() { + ensureReferenceInstantiated(); + + NormalizableKey<?> key = (NormalizableKey<?>) reference; + return key.getMaxNormalizedKeyLen(); + } + + @Override + public boolean isNormalizedKeyPrefixOnly(int keyBytes) { + return keyBytes < getNormalizeKeyLen(); + } + + @Override + public void putNormalizedKey(T record, MemorySegment target, int offset, int numBytes) { + NormalizableKey<?> key = (NormalizableKey<?>) record; + key.copyNormalizedKey(target, offset, numBytes); + } + + @Override + public boolean invertNormalizedKey() { + return !ascendingComparison; + } + + @Override + public TypeComparator<T> duplicate() { + return new WritableComparator<T>(ascendingComparison, type); + } + + @Override + public int extractKeys(Object record, Object[] target, int index) { + target[index] = record; + return 1; + } + + @SuppressWarnings("rawtypes") + @Override + public TypeComparator[] getFlatComparators() { + return comparators; + } + + // -------------------------------------------------------------------------------------------- + // unsupported normalization + // -------------------------------------------------------------------------------------------- + + @Override + public boolean supportsSerializationWithKeyNormalization() { + return false; + } + + @Override + public void writeWithKeyNormalization(T record, DataOutputView target) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public T readWithKeyDenormalization(T reuse, DataInputView source) throws IOException { + throw new UnsupportedOperationException(); + } + + // -------------------------------------------------------------------------------------------- + + private void checkKryoInitialized() { + if (this.kryo == null) { + this.kryo = new Kryo(); + + Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy(); + instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy()); + kryo.setInstantiatorStrategy(instantiatorStrategy); + + this.kryo.setAsmEnabled(true); + this.kryo.register(type); + } + } + + private void ensureReferenceInstantiated() { + if (reference == null) { + reference = InstantiationUtil.instantiate(type, Writable.class); + } + } + + private void ensureTempReferenceInstantiated() { + if (tempReference == null) { + tempReference = InstantiationUtil.instantiate(type, Writable.class); + } + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java new file mode 100644 index 0000000..9036d75 --- /dev/null +++ b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + + +import com.esotericsoftware.kryo.Kryo; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.InstantiationUtil; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.objenesis.strategy.StdInstantiatorStrategy; + +import java.io.IOException; + +public class WritableSerializer<T extends Writable> extends TypeSerializer<T> { + + private static final long serialVersionUID = 1L; + + private final Class<T> typeClass; + + private transient Kryo kryo; + + private transient T copyInstance; + + public WritableSerializer(Class<T> typeClass) { + this.typeClass = typeClass; + } + + @SuppressWarnings("unchecked") + @Override + public T createInstance() { + if(typeClass == NullWritable.class) { + return (T) NullWritable.get(); + } + return InstantiationUtil.instantiate(typeClass); + } + + + + @Override + public T copy(T from) { + checkKryoInitialized(); + + return KryoUtils.copy(from, kryo, this); + } + + @Override + public T copy(T from, T reuse) { + checkKryoInitialized(); + + return KryoUtils.copy(from, reuse, kryo, this); + } + + @Override + public int getLength() { + return -1; + } + + @Override + public void serialize(T record, DataOutputView target) throws IOException { + record.write(target); + } + + @Override + public T deserialize(DataInputView source) throws IOException { + return deserialize(createInstance(), source); + } + + @Override + public T deserialize(T reuse, DataInputView source) throws IOException { + reuse.readFields(source); + return reuse; + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + ensureInstanceInstantiated(); + copyInstance.readFields(source); + copyInstance.write(target); + } + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public WritableSerializer<T> duplicate() { + return new WritableSerializer<T>(typeClass); + } + + // -------------------------------------------------------------------------------------------- + + private void ensureInstanceInstantiated() { + if (copyInstance == null) { + copyInstance = createInstance(); + } + } + + private void checkKryoInitialized() { + if (this.kryo == null) { + this.kryo = new Kryo(); + + Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy(); + instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy()); + kryo.setInstantiatorStrategy(instantiatorStrategy); + + this.kryo.setAsmEnabled(true); + this.kryo.register(typeClass); + } + } + // -------------------------------------------------------------------------------------------- + + @Override + public int hashCode() { + return this.typeClass.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof WritableSerializer) { + WritableSerializer<?> other = (WritableSerializer<?>) obj; + + return other.canEqual(this) && typeClass == other.typeClass; + } else { + return false; + } + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof WritableSerializer; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopInputs.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopInputs.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopInputs.java new file mode 100644 index 0000000..9e8a3e4 --- /dev/null +++ b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopInputs.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.hadoopcompatibility; + +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.Job; + +import java.io.IOException; + +/** + * HadoopInputs is a utility class to use Apache Hadoop InputFormats with Apache Flink. + * + * It provides methods to create Flink InputFormat wrappers for Hadoop {@link org.apache.hadoop.mapred.InputFormat} + * and {@link org.apache.hadoop.mapreduce.InputFormat}. + * + * Key value pairs produced by the Hadoop InputFormats are converted into Flink + * {@link org.apache.flink.api.java.tuple.Tuple2 Tuple2} objects where the first field + * ({@link org.apache.flink.api.java.tuple.Tuple2#f0 Tuple2.f0}) is the key and the second field + * ({@link org.apache.flink.api.java.tuple.Tuple2#f1 Tuple2.f1}) is the value. + * + */ + +public final class HadoopInputs { + // ----------------------------------- Hadoop Input Format --------------------------------------- + + /** + * Creates a Flink {@link InputFormat} that wraps the given Hadoop {@link org.apache.hadoop.mapred.FileInputFormat}. + * + * @return A Flink InputFormat that wraps the Hadoop FileInputFormat. + */ + public static <K,V> HadoopInputFormat<K, V> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath, JobConf job) { + // set input path in JobConf + org.apache.hadoop.mapred.FileInputFormat.addInputPath(job, new org.apache.hadoop.fs.Path(inputPath)); + // return wrapping InputFormat + return createHadoopInput(mapredInputFormat, key, value, job); + } + + /** + * Creates a Flink {@link InputFormat} that wraps the given Hadoop {@link org.apache.hadoop.mapred.FileInputFormat}. + * + * @return A Flink InputFormat that wraps the Hadoop FileInputFormat. + */ + public static <K,V> HadoopInputFormat<K, V> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath) { + return readHadoopFile(mapredInputFormat, key, value, inputPath, new JobConf()); + } + + /** + * Creates a Flink {@link InputFormat} to read a Hadoop sequence file for the given key and value classes. + * + * @return A Flink InputFormat that wraps a Hadoop SequenceFileInputFormat. + */ + public static <K,V> HadoopInputFormat<K, V> readSequenceFile(Class<K> key, Class<V> value, String inputPath) throws IOException { + return readHadoopFile(new org.apache.hadoop.mapred.SequenceFileInputFormat<K, V>(), key, value, inputPath); + } + + /** + * Creates a Flink {@link InputFormat} that wraps the given Hadoop {@link org.apache.hadoop.mapred.InputFormat}. + * + * @return A Flink InputFormat that wraps the Hadoop InputFormat. + */ + public static <K,V> HadoopInputFormat<K, V> createHadoopInput(org.apache.hadoop.mapred.InputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, JobConf job) { + return new HadoopInputFormat<>(mapredInputFormat, key, value, job); + } + + /** + * Creates a Flink {@link InputFormat} that wraps the given Hadoop {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}. + * + * @return A Flink InputFormat that wraps the Hadoop FileInputFormat. + */ + public static <K,V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> readHadoopFile( + org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, String inputPath, Job job) throws IOException + { + // set input path in Job + org.apache.hadoop.mapreduce.lib.input.FileInputFormat.addInputPath(job, new org.apache.hadoop.fs.Path(inputPath)); + // return wrapping InputFormat + return createHadoopInput(mapreduceInputFormat, key, value, job); + } + + /** + * Creates a Flink {@link InputFormat} that wraps the given Hadoop {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}. + * + * @return A Flink InputFormat that wraps the Hadoop FileInputFormat. + */ + public static <K,V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> readHadoopFile( + org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, String inputPath) throws IOException + { + return readHadoopFile(mapreduceInputFormat, key, value, inputPath, Job.getInstance()); + } + + /** + * Creates a Flink {@link InputFormat} that wraps the given Hadoop {@link org.apache.hadoop.mapreduce.InputFormat}. + * + * @return A Flink InputFormat that wraps the Hadoop InputFormat. + */ + public static <K,V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> createHadoopInput( + org.apache.hadoop.mapreduce.InputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, Job job) + { + return new org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<>(mapreduceInputFormat, key, value, job); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopUtils.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopUtils.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopUtils.java new file mode 100644 index 0000000..97ca329 --- /dev/null +++ b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopUtils.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.hadoopcompatibility; + +import org.apache.commons.cli.Option; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.hadoop.util.GenericOptionsParser; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + * Utility class to work with Apache Hadoop libraries. + */ +public class HadoopUtils { + /** + * Returns {@link ParameterTool} for the arguments parsed by {@link GenericOptionsParser} + * + * @param args Input array arguments. It should be parsable by {@link GenericOptionsParser} + * @return A {@link ParameterTool} + * @throws IOException If arguments cannot be parsed by {@link GenericOptionsParser} + * @see GenericOptionsParser + */ + public static ParameterTool paramsFromGenericOptionsParser(String[] args) throws IOException { + Option[] options = new GenericOptionsParser(args).getCommandLine().getOptions(); + Map<String, String> map = new HashMap<String, String>(); + for (Option option : options) { + String[] split = option.getValue().split("="); + map.put(split[0], split[1]); + } + return ParameterTool.fromMap(map); + } +} + + http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java new file mode 100644 index 0000000..ba8aa90 --- /dev/null +++ b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.hadoopcompatibility.mapred; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; + +import org.apache.flink.annotation.Public; +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter; +import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector; +import org.apache.flink.util.Collector; +import org.apache.flink.util.InstantiationUtil; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.Reporter; + +/** + * This wrapper maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction. + */ +@SuppressWarnings("rawtypes") +@Public +public final class HadoopMapFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT> + extends RichFlatMapFunction<Tuple2<KEYIN,VALUEIN>, Tuple2<KEYOUT,VALUEOUT>> + implements ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable { + + private static final long serialVersionUID = 1L; + + private transient Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT> mapper; + private transient JobConf jobConf; + + private transient HadoopOutputCollector<KEYOUT,VALUEOUT> outputCollector; + private transient Reporter reporter; + + /** + * Maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction. + * + * @param hadoopMapper The Hadoop Mapper to wrap. + */ + public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper) { + this(hadoopMapper, new JobConf()); + } + + /** + * Maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction. + * The Hadoop Mapper is configured with the provided JobConf. + * + * @param hadoopMapper The Hadoop Mapper to wrap. + * @param conf The JobConf that is used to configure the Hadoop Mapper. + */ + public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper, JobConf conf) { + if(hadoopMapper == null) { + throw new NullPointerException("Mapper may not be null."); + } + if(conf == null) { + throw new NullPointerException("JobConf may not be null."); + } + + this.mapper = hadoopMapper; + this.jobConf = conf; + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + this.mapper.configure(jobConf); + + this.reporter = new HadoopDummyReporter(); + this.outputCollector = new HadoopOutputCollector<KEYOUT, VALUEOUT>(); + } + + @Override + public void flatMap(final Tuple2<KEYIN,VALUEIN> value, final Collector<Tuple2<KEYOUT,VALUEOUT>> out) + throws Exception { + outputCollector.setFlinkCollector(out); + mapper.map(value.f0, value.f1, outputCollector, reporter); + } + + @SuppressWarnings("unchecked") + @Override + public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() { + Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Mapper.class, mapper.getClass(), 2); + Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Mapper.class, mapper.getClass(), 3); + + final TypeInformation<KEYOUT> keyTypeInfo = TypeExtractor.getForClass((Class<KEYOUT>) outKeyClass); + final TypeInformation<VALUEOUT> valueTypleInfo = TypeExtractor.getForClass((Class<VALUEOUT>) outValClass); + return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, valueTypleInfo); + } + + /** + * Custom serialization methods. + * @see <a href="http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html">http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html</a> + */ + private void writeObject(final ObjectOutputStream out) throws IOException { + out.writeObject(mapper.getClass()); + jobConf.write(out); + } + + @SuppressWarnings("unchecked") + private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException { + Class<Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> mapperClass = + (Class<Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>>)in.readObject(); + mapper = InstantiationUtil.instantiate(mapperClass); + + jobConf = new JobConf(); + jobConf.readFields(in); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java new file mode 100644 index 0000000..c1acc2b --- /dev/null +++ b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.hadoopcompatibility.mapred; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; + +import org.apache.flink.annotation.Public; +import org.apache.flink.api.common.functions.GroupCombineFunction; +import org.apache.flink.api.common.functions.RichGroupReduceFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter; +import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector; +import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator; +import org.apache.flink.util.Collector; +import org.apache.flink.util.InstantiationUtil; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; + +/** + * This wrapper maps a Hadoop Reducer and Combiner (mapred API) to a combinable Flink GroupReduceFunction. + */ +@SuppressWarnings("rawtypes") +@Public +public final class HadoopReduceCombineFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT> + extends RichGroupReduceFunction<Tuple2<KEYIN,VALUEIN>,Tuple2<KEYOUT,VALUEOUT>> + implements GroupCombineFunction<Tuple2<KEYIN,VALUEIN>, Tuple2<KEYIN,VALUEIN>>, + ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable { + + private static final long serialVersionUID = 1L; + + private transient Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> reducer; + private transient Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> combiner; + private transient JobConf jobConf; + + private transient HadoopTupleUnwrappingIterator<KEYIN, VALUEIN> valueIterator; + private transient HadoopOutputCollector<KEYOUT,VALUEOUT> reduceCollector; + private transient HadoopOutputCollector<KEYIN,VALUEIN> combineCollector; + private transient Reporter reporter; + + /** + * Maps two Hadoop Reducer (mapred API) to a combinable Flink GroupReduceFunction. + * + * @param hadoopReducer The Hadoop Reducer that is mapped to a GroupReduceFunction. + * @param hadoopCombiner The Hadoop Reducer that is mapped to the combiner function. + */ + public HadoopReduceCombineFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer, + Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> hadoopCombiner) { + this(hadoopReducer, hadoopCombiner, new JobConf()); + } + + /** + * Maps two Hadoop Reducer (mapred API) to a combinable Flink GroupReduceFunction. + * + * @param hadoopReducer The Hadoop Reducer that is mapped to a GroupReduceFunction. + * @param hadoopCombiner The Hadoop Reducer that is mapped to the combiner function. + * @param conf The JobConf that is used to configure both Hadoop Reducers. + */ + public HadoopReduceCombineFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer, + Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> hadoopCombiner, JobConf conf) { + if(hadoopReducer == null) { + throw new NullPointerException("Reducer may not be null."); + } + if(hadoopCombiner == null) { + throw new NullPointerException("Combiner may not be null."); + } + if(conf == null) { + throw new NullPointerException("JobConf may not be null."); + } + + this.reducer = hadoopReducer; + this.combiner = hadoopCombiner; + this.jobConf = conf; + } + + @SuppressWarnings("unchecked") + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + this.reducer.configure(jobConf); + this.combiner.configure(jobConf); + + this.reporter = new HadoopDummyReporter(); + Class<KEYIN> inKeyClass = (Class<KEYIN>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 0); + TypeSerializer<KEYIN> keySerializer = TypeExtractor.getForClass(inKeyClass).createSerializer(getRuntimeContext().getExecutionConfig()); + this.valueIterator = new HadoopTupleUnwrappingIterator<>(keySerializer); + this.combineCollector = new HadoopOutputCollector<>(); + this.reduceCollector = new HadoopOutputCollector<>(); + } + + @Override + public void reduce(final Iterable<Tuple2<KEYIN,VALUEIN>> values, final Collector<Tuple2<KEYOUT,VALUEOUT>> out) + throws Exception { + reduceCollector.setFlinkCollector(out); + valueIterator.set(values.iterator()); + reducer.reduce(valueIterator.getCurrentKey(), valueIterator, reduceCollector, reporter); + } + + @Override + public void combine(final Iterable<Tuple2<KEYIN,VALUEIN>> values, final Collector<Tuple2<KEYIN,VALUEIN>> out) throws Exception { + combineCollector.setFlinkCollector(out); + valueIterator.set(values.iterator()); + combiner.reduce(valueIterator.getCurrentKey(), valueIterator, combineCollector, reporter); + } + + @SuppressWarnings("unchecked") + @Override + public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() { + Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 2); + Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 3); + + final TypeInformation<KEYOUT> keyTypeInfo = TypeExtractor.getForClass(outKeyClass); + final TypeInformation<VALUEOUT> valueTypleInfo = TypeExtractor.getForClass(outValClass); + return new TupleTypeInfo<>(keyTypeInfo, valueTypleInfo); + } + + /** + * Custom serialization methods. + * @see <a href="http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html">http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html</a> + */ + private void writeObject(final ObjectOutputStream out) throws IOException { + + out.writeObject(reducer.getClass()); + out.writeObject(combiner.getClass()); + jobConf.write(out); + } + + @SuppressWarnings("unchecked") + private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException { + + Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> reducerClass = + (Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>>)in.readObject(); + reducer = InstantiationUtil.instantiate(reducerClass); + + Class<Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN>> combinerClass = + (Class<Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN>>)in.readObject(); + combiner = InstantiationUtil.instantiate(combinerClass); + + jobConf = new JobConf(); + jobConf.readFields(in); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java new file mode 100644 index 0000000..55aea24 --- /dev/null +++ b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.hadoopcompatibility.mapred; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; + +import org.apache.flink.annotation.Public; +import org.apache.flink.api.common.functions.RichGroupReduceFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter; +import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector; +import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator; +import org.apache.flink.util.Collector; +import org.apache.flink.util.InstantiationUtil; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; + +/** + * This wrapper maps a Hadoop Reducer (mapred API) to a non-combinable Flink GroupReduceFunction. + */ +@SuppressWarnings("rawtypes") +@Public +public final class HadoopReduceFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT> + extends RichGroupReduceFunction<Tuple2<KEYIN,VALUEIN>,Tuple2<KEYOUT,VALUEOUT>> + implements ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable { + + private static final long serialVersionUID = 1L; + + private transient Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> reducer; + private transient JobConf jobConf; + + private transient HadoopTupleUnwrappingIterator<KEYIN, VALUEIN> valueIterator; + private transient HadoopOutputCollector<KEYOUT,VALUEOUT> reduceCollector; + private transient Reporter reporter; + + /** + * Maps a Hadoop Reducer (mapred API) to a non-combinable Flink GroupReduceFunction. + * + * @param hadoopReducer The Hadoop Reducer to wrap. + */ + public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer) { + this(hadoopReducer, new JobConf()); + } + + /** + * Maps a Hadoop Reducer (mapred API) to a non-combinable Flink GroupReduceFunction. + * + * @param hadoopReducer The Hadoop Reducer to wrap. + * @param conf The JobConf that is used to configure the Hadoop Reducer. + */ + public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer, JobConf conf) { + if(hadoopReducer == null) { + throw new NullPointerException("Reducer may not be null."); + } + if(conf == null) { + throw new NullPointerException("JobConf may not be null."); + } + + this.reducer = hadoopReducer; + this.jobConf = conf; + } + + @SuppressWarnings("unchecked") + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + this.reducer.configure(jobConf); + + this.reporter = new HadoopDummyReporter(); + this.reduceCollector = new HadoopOutputCollector<KEYOUT, VALUEOUT>(); + Class<KEYIN> inKeyClass = (Class<KEYIN>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 0); + TypeSerializer<KEYIN> keySerializer = TypeExtractor.getForClass(inKeyClass).createSerializer(getRuntimeContext().getExecutionConfig()); + this.valueIterator = new HadoopTupleUnwrappingIterator<KEYIN, VALUEIN>(keySerializer); + } + + @Override + public void reduce(final Iterable<Tuple2<KEYIN,VALUEIN>> values, final Collector<Tuple2<KEYOUT,VALUEOUT>> out) + throws Exception { + + reduceCollector.setFlinkCollector(out); + valueIterator.set(values.iterator()); + reducer.reduce(valueIterator.getCurrentKey(), valueIterator, reduceCollector, reporter); + } + + @SuppressWarnings("unchecked") + @Override + public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() { + Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 2); + Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 3); + + final TypeInformation<KEYOUT> keyTypeInfo = TypeExtractor.getForClass((Class<KEYOUT>) outKeyClass); + final TypeInformation<VALUEOUT> valueTypleInfo = TypeExtractor.getForClass((Class<VALUEOUT>) outValClass); + return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, valueTypleInfo); + } + + /** + * Custom serialization methods + * @see <a href="http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html">http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html</a> + */ + private void writeObject(final ObjectOutputStream out) throws IOException { + + out.writeObject(reducer.getClass()); + jobConf.write(out); + } + + @SuppressWarnings("unchecked") + private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException { + + Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> reducerClass = + (Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>>)in.readObject(); + reducer = InstantiationUtil.instantiate(reducerClass); + + jobConf = new JobConf(); + jobConf.readFields(in); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java new file mode 100644 index 0000000..bfe03d3 --- /dev/null +++ b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.hadoopcompatibility.mapred.wrapper; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.util.Collector; +import org.apache.hadoop.mapred.OutputCollector; + +import java.io.IOException; + +/** + * A Hadoop OutputCollector that wraps a Flink OutputCollector. + * On each call of collect() the data is forwarded to the wrapped Flink collector. + */ +public final class HadoopOutputCollector<KEY,VALUE> implements OutputCollector<KEY,VALUE> { + + private Collector<Tuple2<KEY,VALUE>> flinkCollector; + + private final Tuple2<KEY,VALUE> outTuple = new Tuple2<KEY, VALUE>(); + + /** + * Set the wrapped Flink collector. + * + * @param flinkCollector The wrapped Flink OutputCollector. + */ + public void setFlinkCollector(Collector<Tuple2<KEY, VALUE>> flinkCollector) { + this.flinkCollector = flinkCollector; + } + + /** + * Use the wrapped Flink collector to collect a key-value pair for Flink. + * + * @param key the key to collect + * @param val the value to collect + * @throws IOException unexpected of key or value in key-value pair. + */ + @Override + public void collect(final KEY key, final VALUE val) throws IOException { + this.outTuple.f0 = key; + this.outTuple.f1 = val; + this.flinkCollector.collect(outTuple); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java new file mode 100644 index 0000000..2d204b8 --- /dev/null +++ b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.hadoopcompatibility.mapred.wrapper; + +import java.util.Iterator; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.operators.translation.TupleUnwrappingIterator; +import org.apache.flink.api.java.tuple.Tuple2; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Wraps a Flink Tuple2 (key-value-pair) iterator into an iterator over the second (value) field. + */ +public class HadoopTupleUnwrappingIterator<KEY,VALUE> + extends TupleUnwrappingIterator<VALUE, KEY> implements java.io.Serializable { + + private static final long serialVersionUID = 1L; + + private final TypeSerializer<KEY> keySerializer; + + private transient Iterator<Tuple2<KEY,VALUE>> iterator; + + private transient KEY curKey; + private transient VALUE firstValue; + private transient boolean atFirst; + + public HadoopTupleUnwrappingIterator(TypeSerializer<KEY> keySerializer) { + this.keySerializer = checkNotNull(keySerializer); + } + + /** + * Set the Flink iterator to wrap. + * + * @param iterator The Flink iterator to wrap. + */ + @Override + public void set(final Iterator<Tuple2<KEY,VALUE>> iterator) { + this.iterator = iterator; + if(this.hasNext()) { + final Tuple2<KEY, VALUE> tuple = iterator.next(); + this.curKey = keySerializer.copy(tuple.f0); + this.firstValue = tuple.f1; + this.atFirst = true; + } else { + this.atFirst = false; + } + } + + @Override + public boolean hasNext() { + if(this.atFirst) { + return true; + } + return iterator.hasNext(); + } + + @Override + public VALUE next() { + if(this.atFirst) { + this.atFirst = false; + return firstValue; + } + + final Tuple2<KEY, VALUE> tuple = iterator.next(); + return tuple.f1; + } + + public KEY getCurrentKey() { + return this.curKey; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/hadoopcompatibility/scala/HadoopInputs.scala ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/hadoopcompatibility/scala/HadoopInputs.scala b/flink-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/hadoopcompatibility/scala/HadoopInputs.scala new file mode 100644 index 0000000..133a5f4 --- /dev/null +++ b/flink-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/hadoopcompatibility/scala/HadoopInputs.scala @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.hadoopcompatibility.scala + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala.hadoop.mapreduce +import org.apache.flink.api.scala.hadoop.mapred +import org.apache.hadoop.fs.{Path => HadoopPath} +import org.apache.hadoop.mapred.{JobConf, FileInputFormat => MapredFileInputFormat, InputFormat => MapredInputFormat} +import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => MapreduceFileInputFormat} +import org.apache.hadoop.mapreduce.{Job, InputFormat => MapreduceInputFormat} + +/** + * HadoopInputs is a utility class to use Apache Hadoop InputFormats with Apache Flink. + * + * It provides methods to create Flink InputFormat wrappers for Hadoop + * [[org.apache.hadoop.mapred.InputFormat]] and [[org.apache.hadoop.mapreduce.InputFormat]]. + * + * Key value pairs produced by the Hadoop InputFormats are converted into [[Tuple2]] where + * the first field is the key and the second field is the value. + * + */ +object HadoopInputs { + + /** + * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop + * [[org.apache.hadoop.mapred.FileInputFormat]]. + */ + def readHadoopFile[K, V]( + mapredInputFormat: MapredFileInputFormat[K, V], + key: Class[K], + value: Class[V], + inputPath: String, + job: JobConf)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V] = { + + // set input path in JobConf + MapredFileInputFormat.addInputPath(job, new HadoopPath(inputPath)) + // wrap mapredInputFormat + createHadoopInput(mapredInputFormat, key, value, job) + } + + /** + * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop + * [[org.apache.hadoop.mapred.FileInputFormat]]. + */ + def readHadoopFile[K, V]( + mapredInputFormat: MapredFileInputFormat[K, V], + key: Class[K], + value: Class[V], + inputPath: String)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V] = { + + readHadoopFile(mapredInputFormat, key, value, inputPath, new JobConf) + } + + /** + * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that reads a Hadoop sequence + * file with the given key and value classes. + */ + def readSequenceFile[K, V]( + key: Class[K], + value: Class[V], + inputPath: String)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V] = { + + readHadoopFile( + new org.apache.hadoop.mapred.SequenceFileInputFormat[K, V], + key, + value, + inputPath + ) + } + + /** + * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop + * [[org.apache.hadoop.mapred.InputFormat]]. + */ + def createHadoopInput[K, V]( + mapredInputFormat: MapredInputFormat[K, V], + key: Class[K], + value: Class[V], + job: JobConf)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V] = { + + new mapred.HadoopInputFormat[K, V](mapredInputFormat, key, value, job) + } + + /** + * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop + * [[org.apache.hadoop.mapreduce.lib.input.FileInputFormat]]. + */ + def readHadoopFile[K, V]( + mapreduceInputFormat: MapreduceFileInputFormat[K, V], + key: Class[K], + value: Class[V], + inputPath: String, + job: Job)(implicit tpe: TypeInformation[(K, V)]): mapreduce.HadoopInputFormat[K, V] = { + + // set input path in Job + MapreduceFileInputFormat.addInputPath(job, new HadoopPath(inputPath)) + // wrap mapreduceInputFormat + createHadoopInput(mapreduceInputFormat, key, value, job) + } + + /** + * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop + * [[org.apache.hadoop.mapreduce.lib.input.FileInputFormat]]. + */ + def readHadoopFile[K, V]( + mapreduceInputFormat: MapreduceFileInputFormat[K, V], + key: Class[K], + value: Class[V], + inputPath: String)(implicit tpe: TypeInformation[(K, V)]): mapreduce.HadoopInputFormat[K, V] = + { + readHadoopFile(mapreduceInputFormat, key, value, inputPath, Job.getInstance) + } + + /** + * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop + * [[org.apache.hadoop.mapreduce.InputFormat]]. + */ + def createHadoopInput[K, V]( + mapreduceInputFormat: MapreduceInputFormat[K, V], + key: Class[K], + value: Class[V], + job: Job)(implicit tpe: TypeInformation[(K, V)]): mapreduce.HadoopInputFormat[K, V] = { + + new mapreduce.HadoopInputFormat[K, V](mapreduceInputFormat, key, value, job) + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableExtractionTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableExtractionTest.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableExtractionTest.java new file mode 100644 index 0000000..2aefd9f --- /dev/null +++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableExtractionTest.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparator; + +import org.junit.Test; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.List; + +import static org.junit.Assert.*; + +@SuppressWarnings("serial") +public class WritableExtractionTest { + + @Test + public void testDetectWritable() { + // writable interface itself must not be writable + assertFalse(TypeExtractor.isHadoopWritable(Writable.class)); + + // various forms of extension + assertTrue(TypeExtractor.isHadoopWritable(DirectWritable.class)); + assertTrue(TypeExtractor.isHadoopWritable(ViaInterfaceExtension.class)); + assertTrue(TypeExtractor.isHadoopWritable(ViaAbstractClassExtension.class)); + + // some non-writables + assertFalse(TypeExtractor.isHadoopWritable(String.class)); + assertFalse(TypeExtractor.isHadoopWritable(List.class)); + assertFalse(TypeExtractor.isHadoopWritable(WritableComparator.class)); + } + + @Test + public void testCreateWritableInfo() { + TypeInformation<DirectWritable> info1 = + TypeExtractor.createHadoopWritableTypeInfo(DirectWritable.class); + assertEquals(DirectWritable.class, info1.getTypeClass()); + + TypeInformation<ViaInterfaceExtension> info2 = + TypeExtractor.createHadoopWritableTypeInfo(ViaInterfaceExtension.class); + assertEquals(ViaInterfaceExtension.class, info2.getTypeClass()); + + TypeInformation<ViaAbstractClassExtension> info3 = + TypeExtractor.createHadoopWritableTypeInfo(ViaAbstractClassExtension.class); + assertEquals(ViaAbstractClassExtension.class, info3.getTypeClass()); + } + + @Test + public void testValidateTypeInfo() { + // validate unrelated type info + TypeExtractor.validateIfWritable(BasicTypeInfo.STRING_TYPE_INFO, String.class); + + // validate writable type info correctly + TypeExtractor.validateIfWritable(new WritableTypeInfo<>( + DirectWritable.class), DirectWritable.class); + TypeExtractor.validateIfWritable(new WritableTypeInfo<>( + ViaInterfaceExtension.class), ViaInterfaceExtension.class); + TypeExtractor.validateIfWritable(new WritableTypeInfo<>( + ViaAbstractClassExtension.class), ViaAbstractClassExtension.class); + + // incorrect case: not writable at all + try { + TypeExtractor.validateIfWritable(new WritableTypeInfo<>( + DirectWritable.class), String.class); + fail("should have failed with an exception"); + } catch (InvalidTypesException e) { + // expected + } + + // incorrect case: wrong writable + try { + TypeExtractor.validateIfWritable(new WritableTypeInfo<>( + ViaInterfaceExtension.class), DirectWritable.class); + fail("should have failed with an exception"); + } catch (InvalidTypesException e) { + // expected + } + } + + @Test + public void testExtractFromFunction() { + RichMapFunction<DirectWritable, DirectWritable> function = new RichMapFunction<DirectWritable, DirectWritable>() { + @Override + public DirectWritable map(DirectWritable value) throws Exception { + return null; + } + }; + + TypeInformation<DirectWritable> outType = + TypeExtractor.getMapReturnTypes(function, new WritableTypeInfo<>(DirectWritable.class)); + + assertTrue(outType instanceof WritableTypeInfo); + assertEquals(DirectWritable.class, outType.getTypeClass()); + } + + @Test + public void testExtractAsPartOfPojo() { + PojoTypeInfo<PojoWithWritable> pojoInfo = + (PojoTypeInfo<PojoWithWritable>) TypeExtractor.getForClass(PojoWithWritable.class); + + boolean foundWritable = false; + for (int i = 0; i < pojoInfo.getArity(); i++) { + PojoField field = pojoInfo.getPojoFieldAt(i); + String name = field.getField().getName(); + + if (name.equals("hadoopCitizen")) { + if (foundWritable) { + fail("already seen"); + } + foundWritable = true; + assertEquals(new WritableTypeInfo<>(DirectWritable.class), field.getTypeInformation()); + assertEquals(DirectWritable.class, field.getTypeInformation().getTypeClass()); + + } + } + + assertTrue("missed the writable type", foundWritable); + } + + @Test + public void testInputValidationError() { + + RichMapFunction<Writable, String> function = new RichMapFunction<Writable, String>() { + @Override + public String map(Writable value) throws Exception { + return null; + } + }; + + @SuppressWarnings("unchecked") + TypeInformation<Writable> inType = + (TypeInformation<Writable>) (TypeInformation<?>) new WritableTypeInfo<>(DirectWritable.class); + + try { + TypeExtractor.getMapReturnTypes(function, inType); + fail("exception expected"); + } + catch (InvalidTypesException e) { + // right + } + } + + // ------------------------------------------------------------------------ + // test type classes + // ------------------------------------------------------------------------ + + public interface ExtendedWritable extends Writable {} + + public static abstract class AbstractWritable implements Writable {} + + public static class DirectWritable implements Writable { + + @Override + public void write(DataOutput dataOutput) throws IOException {} + + @Override + public void readFields(DataInput dataInput) throws IOException {} + } + + public static class ViaInterfaceExtension implements ExtendedWritable { + + @Override + public void write(DataOutput dataOutput) throws IOException {} + + @Override + public void readFields(DataInput dataInput) throws IOException {} + } + + public static class ViaAbstractClassExtension extends AbstractWritable { + + @Override + public void write(DataOutput dataOutput) throws IOException {} + + @Override + public void readFields(DataInput dataInput) throws IOException {} + } + + public static class PojoWithWritable { + public String str; + public DirectWritable hadoopCitizen; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableInfoParserTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableInfoParserTest.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableInfoParserTest.java new file mode 100644 index 0000000..3d2b652 --- /dev/null +++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableInfoParserTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.hadoop.io.Writable; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +public class WritableInfoParserTest { + + @Test + public void testWritableType() { + TypeInformation<?> ti = TypeInfoParser.parse( + "Writable<org.apache.flink.api.java.typeutils.WritableInfoParserTest$MyWritable>"); + + Assert.assertTrue(ti instanceof WritableTypeInfo<?>); + Assert.assertEquals(MyWritable.class, ((WritableTypeInfo<?>) ti).getTypeClass()); + } + + @Test + public void testPojoWithWritableType() { + TypeInformation<?> ti = TypeInfoParser.parse( + "org.apache.flink.api.java.typeutils.WritableInfoParserTest$MyPojo<" + + "basic=Integer," + + "tuple=Tuple2<String, Integer>," + + "hadoopCitizen=Writable<org.apache.flink.api.java.typeutils.WritableInfoParserTest$MyWritable>," + + "array=String[]" + + ">"); + Assert.assertTrue(ti instanceof PojoTypeInfo); + PojoTypeInfo<?> pti = (PojoTypeInfo<?>) ti; + Assert.assertEquals("array", pti.getPojoFieldAt(0).getField().getName()); + Assert.assertTrue(pti.getPojoFieldAt(0).getTypeInformation() instanceof BasicArrayTypeInfo); + Assert.assertEquals("basic", pti.getPojoFieldAt(1).getField().getName()); + Assert.assertTrue(pti.getPojoFieldAt(1).getTypeInformation() instanceof BasicTypeInfo); + Assert.assertEquals("hadoopCitizen", pti.getPojoFieldAt(2).getField().getName()); + Assert.assertTrue(pti.getPojoFieldAt(2).getTypeInformation() instanceof WritableTypeInfo); + Assert.assertEquals("tuple", pti.getPojoFieldAt(3).getField().getName()); + Assert.assertTrue(pti.getPojoFieldAt(3).getTypeInformation() instanceof TupleTypeInfo); + } + // ------------------------------------------------------------------------ + // Test types + // ------------------------------------------------------------------------ + + public static class MyWritable implements Writable { + + @Override + public void write(DataOutput out) throws IOException {} + + @Override + public void readFields(DataInput in) throws IOException {} + } + + public static class MyPojo { + public Integer basic; + public Tuple2<String, Integer> tuple; + public MyWritable hadoopCitizen; + public String[] array; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableTypeInfoTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableTypeInfoTest.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableTypeInfoTest.java new file mode 100644 index 0000000..eb9cdf0 --- /dev/null +++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableTypeInfoTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import org.apache.flink.util.TestLogger; +import org.apache.hadoop.io.Writable; +import org.junit.Test; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +public class WritableTypeInfoTest extends TestLogger { + + @Test + public void testWritableTypeInfoEquality() { + WritableTypeInfo<TestClass> tpeInfo1 = new WritableTypeInfo<>(TestClass.class); + WritableTypeInfo<TestClass> tpeInfo2 = new WritableTypeInfo<>(TestClass.class); + + assertEquals(tpeInfo1, tpeInfo2); + assertEquals(tpeInfo1.hashCode(), tpeInfo2.hashCode()); + } + + @Test + public void testWritableTypeInfoInequality() { + WritableTypeInfo<TestClass> tpeInfo1 = new WritableTypeInfo<>(TestClass.class); + WritableTypeInfo<AlternateClass> tpeInfo2 = new WritableTypeInfo<>(AlternateClass.class); + + assertNotEquals(tpeInfo1, tpeInfo2); + } + + // ------------------------------------------------------------------------ + // test types + // ------------------------------------------------------------------------ + + public static class TestClass implements Writable { + + @Override + public void write(DataOutput dataOutput) throws IOException {} + + @Override + public void readFields(DataInput dataInput) throws IOException {} + } + + public static class AlternateClass implements Writable { + + @Override + public void write(DataOutput dataOutput) throws IOException {} + + @Override + public void readFields(DataInput dataInput) throws IOException {} + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java new file mode 100644 index 0000000..c32f5da --- /dev/null +++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.hadoop.io.Writable; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +public class StringArrayWritable implements Writable, Comparable<StringArrayWritable> { + + private String[] array = new String[0]; + + public StringArrayWritable() { + super(); + } + + public StringArrayWritable(String[] array) { + this.array = array; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(this.array.length); + + for(String str : this.array) { + byte[] b = str.getBytes(); + out.writeInt(b.length); + out.write(b); + } + } + + @Override + public void readFields(DataInput in) throws IOException { + this.array = new String[in.readInt()]; + + for(int i = 0; i < this.array.length; i++) { + byte[] b = new byte[in.readInt()]; + in.readFully(b); + this.array[i] = new String(b); + } + } + + @Override + public int compareTo(StringArrayWritable o) { + if(this.array.length != o.array.length) { + return this.array.length - o.array.length; + } + + for(int i = 0; i < this.array.length; i++) { + int comp = this.array[i].compareTo(o.array[i]); + if(comp != 0) { + return comp; + } + } + return 0; + } + + @Override + public boolean equals(Object obj) { + if(!(obj instanceof StringArrayWritable)) { + return false; + } + return this.compareTo((StringArrayWritable) obj) == 0; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java new file mode 100644 index 0000000..96f844c --- /dev/null +++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.typeutils.ComparatorTestBase; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; + +public class WritableComparatorTest extends ComparatorTestBase<StringArrayWritable> { + + StringArrayWritable[] data = new StringArrayWritable[]{ + new StringArrayWritable(new String[]{}), + new StringArrayWritable(new String[]{""}), + new StringArrayWritable(new String[]{"a","a"}), + new StringArrayWritable(new String[]{"a","b"}), + new StringArrayWritable(new String[]{"c","c"}), + new StringArrayWritable(new String[]{"d","f"}), + new StringArrayWritable(new String[]{"d","m"}), + new StringArrayWritable(new String[]{"z","x"}), + new StringArrayWritable(new String[]{"a","a", "a"}) + }; + + @Override + protected TypeComparator<StringArrayWritable> createComparator(boolean ascending) { + return new WritableComparator<StringArrayWritable>(ascending, StringArrayWritable.class); + } + + @Override + protected TypeSerializer<StringArrayWritable> createSerializer() { + return new WritableSerializer<StringArrayWritable>(StringArrayWritable.class); + } + + @Override + protected StringArrayWritable[] getSortedTestData() { + return data; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorUUIDTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorUUIDTest.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorUUIDTest.java new file mode 100644 index 0000000..94e759d --- /dev/null +++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorUUIDTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.typeutils.ComparatorTestBase; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; + +import java.util.UUID; + +public class WritableComparatorUUIDTest extends ComparatorTestBase<WritableID> { + @Override + protected TypeComparator<WritableID> createComparator(boolean ascending) { + return new WritableComparator<>(ascending, WritableID.class); + } + + @Override + protected TypeSerializer<WritableID> createSerializer() { + return new WritableSerializer<>(WritableID.class); + } + + @Override + protected WritableID[] getSortedTestData() { + return new WritableID[] { + new WritableID(new UUID(0, 0)), + new WritableID(new UUID(1, 0)), + new WritableID(new UUID(1, 1)) + }; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableID.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableID.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableID.java new file mode 100644 index 0000000..4274cf6 --- /dev/null +++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableID.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.hadoop.io.WritableComparable; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.UUID; + +public class WritableID implements WritableComparable<WritableID> { + private UUID uuid; + + public WritableID() { + this.uuid = UUID.randomUUID(); + } + + public WritableID(UUID uuid) { + this.uuid = uuid; + } + + @Override + public int compareTo(WritableID o) { + return this.uuid.compareTo(o.uuid); + } + + @Override + public void write(DataOutput dataOutput) throws IOException { + dataOutput.writeLong(uuid.getMostSignificantBits()); + dataOutput.writeLong(uuid.getLeastSignificantBits()); + } + + @Override + public void readFields(DataInput dataInput) throws IOException { + this.uuid = new UUID(dataInput.readLong(), dataInput.readLong()); + } + + @Override + public String toString() { + return uuid.toString(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + WritableID id = (WritableID) o; + + return !(uuid != null ? !uuid.equals(id.uuid) : id.uuid != null); + } + + @Override + public int hashCode() { + return uuid != null ? uuid.hashCode() : 0; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java new file mode 100644 index 0000000..bb5f4d4 --- /dev/null +++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.SerializerTestInstance; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.java.typeutils.WritableTypeInfo; +import org.junit.Test; + +public class WritableSerializerTest { + + @Test + public void testStringArrayWritable() { + StringArrayWritable[] data = new StringArrayWritable[]{ + new StringArrayWritable(new String[]{}), + new StringArrayWritable(new String[]{""}), + new StringArrayWritable(new String[]{"a","a"}), + new StringArrayWritable(new String[]{"a","b"}), + new StringArrayWritable(new String[]{"c","c"}), + new StringArrayWritable(new String[]{"d","f"}), + new StringArrayWritable(new String[]{"d","m"}), + new StringArrayWritable(new String[]{"z","x"}), + new StringArrayWritable(new String[]{"a","a", "a"}) + }; + + WritableTypeInfo<StringArrayWritable> writableTypeInfo = (WritableTypeInfo<StringArrayWritable>) TypeExtractor.getForObject(data[0]); + WritableSerializer<StringArrayWritable> writableSerializer = (WritableSerializer<StringArrayWritable>) writableTypeInfo.createSerializer(new ExecutionConfig()); + + SerializerTestInstance<StringArrayWritable> testInstance = new SerializerTestInstance<StringArrayWritable>(writableSerializer,writableTypeInfo.getTypeClass(), -1, data); + + testInstance.testAll(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUUIDTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUUIDTest.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUUIDTest.java new file mode 100644 index 0000000..2af7730 --- /dev/null +++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUUIDTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.typeutils.SerializerTestBase; +import org.apache.flink.api.common.typeutils.TypeSerializer; + +import java.util.UUID; + +public class WritableSerializerUUIDTest extends SerializerTestBase<WritableID> { + @Override + protected TypeSerializer<WritableID> createSerializer() { + return new WritableSerializer<>(WritableID.class); + } + + @Override + protected int getLength() { + return -1; + } + + @Override + protected Class<WritableID> getTypeClass() { + return WritableID.class; + } + + @Override + protected WritableID[] getTestData() { + return new WritableID[] { + new WritableID(new UUID(0, 0)), + new WritableID(new UUID(1, 0)), + new WritableID(new UUID(1, 1)) + }; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/hadoopcompatibility/HadoopUtilsTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/hadoopcompatibility/HadoopUtilsTest.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/hadoopcompatibility/HadoopUtilsTest.java new file mode 100644 index 0000000..6f7673b --- /dev/null +++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/hadoopcompatibility/HadoopUtilsTest.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.hadoopcompatibility; + +import org.apache.flink.api.java.utils.AbstractParameterToolTest; +import org.apache.flink.api.java.utils.ParameterTool; +import org.junit.Test; + +import java.io.IOException; + +public class HadoopUtilsTest extends AbstractParameterToolTest { + + @Test + public void testParamsFromGenericOptionsParser() throws IOException { + ParameterTool parameter = HadoopUtils.paramsFromGenericOptionsParser(new String[]{"-D", "input=myInput", "-DexpectedCount=15"}); + validate(parameter); + } +}
