[FLINK-4315] [dataSet] [hadoopCompat] Annotate Hadoop-related methods in ExecutionEnvironment as @Deprecated.
- Preparation to remove Hadoop dependency from flink-java - Alternatives for deprecated functionality is provided in flink-hadoop-compatibility via HadoopInputs This closes #2637. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7d61e1f2 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7d61e1f2 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7d61e1f2 Branch: refs/heads/master Commit: 7d61e1f2fd0c9b0e3719b2d7252a164cfdf941c4 Parents: 4565170 Author: Evgeny_Kincharov <[email protected]> Authored: Fri Oct 14 17:19:22 2016 +0400 Committer: Fabian Hueske <[email protected]> Committed: Wed Nov 2 18:31:25 2016 +0100 ---------------------------------------------------------------------- .../flink-hadoop-compatibility/pom.xml | 98 +++++++++++++ .../flink/hadoopcompatibility/HadoopInputs.java | 118 +++++++++++++++ .../flink/hadoopcompatibility/HadoopUtils.java | 52 +++++++ .../scala/HadoopInputs.scala | 143 +++++++++++++++++++ .../hadoopcompatibility/HadoopUtilsTest.java | 34 +++++ flink-java/pom.xml | 2 +- .../flink/api/java/ExecutionEnvironment.java | 33 ++++- .../flink/api/java/utils/ParameterTool.java | 3 + .../java/utils/AbstractParameterToolTest.java | 71 +++++++++ .../flink/api/java/utils/ParameterToolTest.java | 46 +----- .../flink/api/scala/ExecutionEnvironment.scala | 37 ++++- .../hadoop/mapred/WordCountMapredITCase.java | 16 ++- .../mapreduce/WordCountMapreduceITCase.java | 17 ++- .../hadoop/mapred/WordCountMapredITCase.scala | 35 +++-- .../mapreduce/WordCountMapreduceITCase.scala | 26 +++- 15 files changed, 661 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/7d61e1f2/flink-batch-connectors/flink-hadoop-compatibility/pom.xml ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hadoop-compatibility/pom.xml b/flink-batch-connectors/flink-hadoop-compatibility/pom.xml index aa818f6..8143a03 100644 --- a/flink-batch-connectors/flink-hadoop-compatibility/pom.xml +++ b/flink-batch-connectors/flink-hadoop-compatibility/pom.xml @@ -47,6 +47,21 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> + <artifactId>flink-scala_2.10</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-java</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> <artifactId>${shading-artifact.name}</artifactId> <version>${project.version}</version> </dependency> @@ -78,6 +93,89 @@ under the License. <groupId>com.github.siom79.japicmp</groupId> <artifactId>japicmp-maven-plugin</artifactId> </plugin> + <!-- Scala Compiler --> + <plugin> + <groupId>net.alchim31.maven</groupId> + <artifactId>scala-maven-plugin</artifactId> + <version>3.1.4</version> + <executions> + <!-- Run scala compiler in the process-resources phase, so that dependencies on + scala classes can be resolved later in the (Java) compile phase --> + <execution> + <id>scala-compile-first</id> + <phase>process-resources</phase> + <goals> + <goal>compile</goal> + </goals> + </execution> + </executions> + <configuration> + <jvmArgs> + <jvmArg>-Xms128m</jvmArg> + <jvmArg>-Xmx512m</jvmArg> + </jvmArgs> + </configuration> + </plugin> + + <!-- Eclipse Integration --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-eclipse-plugin</artifactId> + <version>2.8</version> + <configuration> + <downloadSources>true</downloadSources> + <projectnatures> + <projectnature>org.scala-ide.sdt.core.scalanature</projectnature> + <projectnature>org.eclipse.jdt.core.javanature</projectnature> + </projectnatures> + <buildcommands> + <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand> + </buildcommands> + <classpathContainers> + <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer> + <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer> + </classpathContainers> + <excludes> + <exclude>org.scala-lang:scala-library</exclude> + <exclude>org.scala-lang:scala-compiler</exclude> + </excludes> + <sourceIncludes> + <sourceInclude>**/*.scala</sourceInclude> + <sourceInclude>**/*.java</sourceInclude> + </sourceIncludes> + </configuration> + </plugin> + + <!-- Adding scala source directories to build path --> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <version>1.7</version> + <executions> + <!-- Add src/main/scala to eclipse build path --> + <execution> + <id>add-source</id> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>src/main/scala</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> + + <!-- Scala Code Style, most of the configuration done via plugin management --> + <plugin> + <groupId>org.scalastyle</groupId> + <artifactId>scalastyle-maven-plugin</artifactId> + <configuration> + <configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation> + </configuration> + </plugin> </plugins> </build> http://git-wip-us.apache.org/repos/asf/flink/blob/7d61e1f2/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopInputs.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopInputs.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopInputs.java new file mode 100644 index 0000000..9e8a3e4 --- /dev/null +++ b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopInputs.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.hadoopcompatibility; + +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.Job; + +import java.io.IOException; + +/** + * HadoopInputs is a utility class to use Apache Hadoop InputFormats with Apache Flink. + * + * It provides methods to create Flink InputFormat wrappers for Hadoop {@link org.apache.hadoop.mapred.InputFormat} + * and {@link org.apache.hadoop.mapreduce.InputFormat}. + * + * Key value pairs produced by the Hadoop InputFormats are converted into Flink + * {@link org.apache.flink.api.java.tuple.Tuple2 Tuple2} objects where the first field + * ({@link org.apache.flink.api.java.tuple.Tuple2#f0 Tuple2.f0}) is the key and the second field + * ({@link org.apache.flink.api.java.tuple.Tuple2#f1 Tuple2.f1}) is the value. + * + */ + +public final class HadoopInputs { + // ----------------------------------- Hadoop Input Format --------------------------------------- + + /** + * Creates a Flink {@link InputFormat} that wraps the given Hadoop {@link org.apache.hadoop.mapred.FileInputFormat}. + * + * @return A Flink InputFormat that wraps the Hadoop FileInputFormat. + */ + public static <K,V> HadoopInputFormat<K, V> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath, JobConf job) { + // set input path in JobConf + org.apache.hadoop.mapred.FileInputFormat.addInputPath(job, new org.apache.hadoop.fs.Path(inputPath)); + // return wrapping InputFormat + return createHadoopInput(mapredInputFormat, key, value, job); + } + + /** + * Creates a Flink {@link InputFormat} that wraps the given Hadoop {@link org.apache.hadoop.mapred.FileInputFormat}. + * + * @return A Flink InputFormat that wraps the Hadoop FileInputFormat. + */ + public static <K,V> HadoopInputFormat<K, V> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath) { + return readHadoopFile(mapredInputFormat, key, value, inputPath, new JobConf()); + } + + /** + * Creates a Flink {@link InputFormat} to read a Hadoop sequence file for the given key and value classes. + * + * @return A Flink InputFormat that wraps a Hadoop SequenceFileInputFormat. + */ + public static <K,V> HadoopInputFormat<K, V> readSequenceFile(Class<K> key, Class<V> value, String inputPath) throws IOException { + return readHadoopFile(new org.apache.hadoop.mapred.SequenceFileInputFormat<K, V>(), key, value, inputPath); + } + + /** + * Creates a Flink {@link InputFormat} that wraps the given Hadoop {@link org.apache.hadoop.mapred.InputFormat}. + * + * @return A Flink InputFormat that wraps the Hadoop InputFormat. + */ + public static <K,V> HadoopInputFormat<K, V> createHadoopInput(org.apache.hadoop.mapred.InputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, JobConf job) { + return new HadoopInputFormat<>(mapredInputFormat, key, value, job); + } + + /** + * Creates a Flink {@link InputFormat} that wraps the given Hadoop {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}. + * + * @return A Flink InputFormat that wraps the Hadoop FileInputFormat. + */ + public static <K,V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> readHadoopFile( + org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, String inputPath, Job job) throws IOException + { + // set input path in Job + org.apache.hadoop.mapreduce.lib.input.FileInputFormat.addInputPath(job, new org.apache.hadoop.fs.Path(inputPath)); + // return wrapping InputFormat + return createHadoopInput(mapreduceInputFormat, key, value, job); + } + + /** + * Creates a Flink {@link InputFormat} that wraps the given Hadoop {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}. + * + * @return A Flink InputFormat that wraps the Hadoop FileInputFormat. + */ + public static <K,V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> readHadoopFile( + org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, String inputPath) throws IOException + { + return readHadoopFile(mapreduceInputFormat, key, value, inputPath, Job.getInstance()); + } + + /** + * Creates a Flink {@link InputFormat} that wraps the given Hadoop {@link org.apache.hadoop.mapreduce.InputFormat}. + * + * @return A Flink InputFormat that wraps the Hadoop InputFormat. + */ + public static <K,V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> createHadoopInput( + org.apache.hadoop.mapreduce.InputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, Job job) + { + return new org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<>(mapreduceInputFormat, key, value, job); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7d61e1f2/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopUtils.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopUtils.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopUtils.java new file mode 100644 index 0000000..97ca329 --- /dev/null +++ b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopUtils.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.hadoopcompatibility; + +import org.apache.commons.cli.Option; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.hadoop.util.GenericOptionsParser; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + * Utility class to work with Apache Hadoop libraries. + */ +public class HadoopUtils { + /** + * Returns {@link ParameterTool} for the arguments parsed by {@link GenericOptionsParser} + * + * @param args Input array arguments. It should be parsable by {@link GenericOptionsParser} + * @return A {@link ParameterTool} + * @throws IOException If arguments cannot be parsed by {@link GenericOptionsParser} + * @see GenericOptionsParser + */ + public static ParameterTool paramsFromGenericOptionsParser(String[] args) throws IOException { + Option[] options = new GenericOptionsParser(args).getCommandLine().getOptions(); + Map<String, String> map = new HashMap<String, String>(); + for (Option option : options) { + String[] split = option.getValue().split("="); + map.put(split[0], split[1]); + } + return ParameterTool.fromMap(map); + } +} + + http://git-wip-us.apache.org/repos/asf/flink/blob/7d61e1f2/flink-batch-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/hadoopcompatibility/scala/HadoopInputs.scala ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/hadoopcompatibility/scala/HadoopInputs.scala b/flink-batch-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/hadoopcompatibility/scala/HadoopInputs.scala new file mode 100644 index 0000000..133a5f4 --- /dev/null +++ b/flink-batch-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/hadoopcompatibility/scala/HadoopInputs.scala @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.hadoopcompatibility.scala + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala.hadoop.mapreduce +import org.apache.flink.api.scala.hadoop.mapred +import org.apache.hadoop.fs.{Path => HadoopPath} +import org.apache.hadoop.mapred.{JobConf, FileInputFormat => MapredFileInputFormat, InputFormat => MapredInputFormat} +import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => MapreduceFileInputFormat} +import org.apache.hadoop.mapreduce.{Job, InputFormat => MapreduceInputFormat} + +/** + * HadoopInputs is a utility class to use Apache Hadoop InputFormats with Apache Flink. + * + * It provides methods to create Flink InputFormat wrappers for Hadoop + * [[org.apache.hadoop.mapred.InputFormat]] and [[org.apache.hadoop.mapreduce.InputFormat]]. + * + * Key value pairs produced by the Hadoop InputFormats are converted into [[Tuple2]] where + * the first field is the key and the second field is the value. + * + */ +object HadoopInputs { + + /** + * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop + * [[org.apache.hadoop.mapred.FileInputFormat]]. + */ + def readHadoopFile[K, V]( + mapredInputFormat: MapredFileInputFormat[K, V], + key: Class[K], + value: Class[V], + inputPath: String, + job: JobConf)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V] = { + + // set input path in JobConf + MapredFileInputFormat.addInputPath(job, new HadoopPath(inputPath)) + // wrap mapredInputFormat + createHadoopInput(mapredInputFormat, key, value, job) + } + + /** + * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop + * [[org.apache.hadoop.mapred.FileInputFormat]]. + */ + def readHadoopFile[K, V]( + mapredInputFormat: MapredFileInputFormat[K, V], + key: Class[K], + value: Class[V], + inputPath: String)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V] = { + + readHadoopFile(mapredInputFormat, key, value, inputPath, new JobConf) + } + + /** + * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that reads a Hadoop sequence + * file with the given key and value classes. + */ + def readSequenceFile[K, V]( + key: Class[K], + value: Class[V], + inputPath: String)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V] = { + + readHadoopFile( + new org.apache.hadoop.mapred.SequenceFileInputFormat[K, V], + key, + value, + inputPath + ) + } + + /** + * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop + * [[org.apache.hadoop.mapred.InputFormat]]. + */ + def createHadoopInput[K, V]( + mapredInputFormat: MapredInputFormat[K, V], + key: Class[K], + value: Class[V], + job: JobConf)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V] = { + + new mapred.HadoopInputFormat[K, V](mapredInputFormat, key, value, job) + } + + /** + * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop + * [[org.apache.hadoop.mapreduce.lib.input.FileInputFormat]]. + */ + def readHadoopFile[K, V]( + mapreduceInputFormat: MapreduceFileInputFormat[K, V], + key: Class[K], + value: Class[V], + inputPath: String, + job: Job)(implicit tpe: TypeInformation[(K, V)]): mapreduce.HadoopInputFormat[K, V] = { + + // set input path in Job + MapreduceFileInputFormat.addInputPath(job, new HadoopPath(inputPath)) + // wrap mapreduceInputFormat + createHadoopInput(mapreduceInputFormat, key, value, job) + } + + /** + * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop + * [[org.apache.hadoop.mapreduce.lib.input.FileInputFormat]]. + */ + def readHadoopFile[K, V]( + mapreduceInputFormat: MapreduceFileInputFormat[K, V], + key: Class[K], + value: Class[V], + inputPath: String)(implicit tpe: TypeInformation[(K, V)]): mapreduce.HadoopInputFormat[K, V] = + { + readHadoopFile(mapreduceInputFormat, key, value, inputPath, Job.getInstance) + } + + /** + * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop + * [[org.apache.hadoop.mapreduce.InputFormat]]. + */ + def createHadoopInput[K, V]( + mapreduceInputFormat: MapreduceInputFormat[K, V], + key: Class[K], + value: Class[V], + job: Job)(implicit tpe: TypeInformation[(K, V)]): mapreduce.HadoopInputFormat[K, V] = { + + new mapreduce.HadoopInputFormat[K, V](mapreduceInputFormat, key, value, job) + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/7d61e1f2/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/hadoopcompatibility/HadoopUtilsTest.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/hadoopcompatibility/HadoopUtilsTest.java b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/hadoopcompatibility/HadoopUtilsTest.java new file mode 100644 index 0000000..6f7673b --- /dev/null +++ b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/hadoopcompatibility/HadoopUtilsTest.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.hadoopcompatibility; + +import org.apache.flink.api.java.utils.AbstractParameterToolTest; +import org.apache.flink.api.java.utils.ParameterTool; +import org.junit.Test; + +import java.io.IOException; + +public class HadoopUtilsTest extends AbstractParameterToolTest { + + @Test + public void testParamsFromGenericOptionsParser() throws IOException { + ParameterTool parameter = HadoopUtils.paramsFromGenericOptionsParser(new String[]{"-D", "input=myInput", "-DexpectedCount=15"}); + validate(parameter); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7d61e1f2/flink-java/pom.xml ---------------------------------------------------------------------- diff --git a/flink-java/pom.xml b/flink-java/pom.xml index 6924da8..5bc81c6 100644 --- a/flink-java/pom.xml +++ b/flink-java/pom.xml @@ -84,7 +84,7 @@ under the License. <artifactId>japicmp-maven-plugin</artifactId> </plugin> - <!-- Because flink-scala and flink-avro uses it in tests --> + <!-- Because flink-scala, flink-avro and flink-hadoop-compatibility uses it in tests --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> http://git-wip-us.apache.org/repos/asf/flink/blob/7d61e1f2/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java index add8531..964eed1 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java @@ -585,9 +585,12 @@ public abstract class ExecutionEnvironment { // ----------------------------------- Hadoop Input Format --------------------------------------- /** - * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapred.FileInputFormat}. The - * given inputName is set on the given job. + * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapred.FileInputFormat}. + * + * @deprecated Please use {@link org.apache.flink.hadoopcompatibility.HadoopInputs#readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V>, Class<K>, Class<V>, String, JobConf)} + * from the flink-hadoop-compatibility module. */ + @Deprecated @PublicEvolving public <K,V> DataSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath, JobConf job) { DataSource<Tuple2<K, V>> result = createHadoopInput(mapredInputFormat, key, value, job); @@ -600,7 +603,11 @@ public abstract class ExecutionEnvironment { /** * Creates a {@link DataSet} from {@link org.apache.hadoop.mapred.SequenceFileInputFormat} * A {@link org.apache.hadoop.mapred.JobConf} with the given inputPath is created. - */ + * + * @deprecated Please use {@link org.apache.flink.hadoopcompatibility.HadoopInputs#readSequenceFile(Class<K>, Class<V>, String)} + * from the flink-hadoop-compatibility module. + */ + @Deprecated @PublicEvolving public <K,V> DataSource<Tuple2<K, V>> readSequenceFile(Class<K> key, Class<V> value, String inputPath) throws IOException { return readHadoopFile(new org.apache.hadoop.mapred.SequenceFileInputFormat<K, V>(), key, value, inputPath); @@ -609,7 +616,11 @@ public abstract class ExecutionEnvironment { /** * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapred.FileInputFormat}. A * {@link org.apache.hadoop.mapred.JobConf} with the given inputPath is created. + * + * @deprecated Please use {@link org.apache.flink.hadoopcompatibility.HadoopInputs#readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V>, Class<K>, Class<V>, String)} + * from the flink-hadoop-compatibility module. */ + @Deprecated @PublicEvolving public <K,V> DataSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath) { return readHadoopFile(mapredInputFormat, key, value, inputPath, new JobConf()); @@ -617,7 +628,11 @@ public abstract class ExecutionEnvironment { /** * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapred.InputFormat}. + * + * @deprecated Please use {@link org.apache.flink.hadoopcompatibility.HadoopInputs#createHadoopInput(org.apache.hadoop.mapred.InputFormat<K,V>, Class<K>, Class<V>, JobConf)} + * from the flink-hadoop-compatibility module. */ + @Deprecated @PublicEvolving public <K,V> DataSource<Tuple2<K, V>> createHadoopInput(org.apache.hadoop.mapred.InputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, JobConf job) { HadoopInputFormat<K, V> hadoopInputFormat = new HadoopInputFormat<>(mapredInputFormat, key, value, job); @@ -628,7 +643,11 @@ public abstract class ExecutionEnvironment { /** * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}. The * given inputName is set on the given job. + * + * @deprecated Please use {@link org.apache.flink.hadoopcompatibility.HadoopInputs#readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V>, Class<K>, Class<V>, String, Job)} + * from the flink-hadoop-compatibility module. */ + @Deprecated @PublicEvolving public <K,V> DataSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, String inputPath, Job job) throws IOException { DataSource<Tuple2<K, V>> result = createHadoopInput(mapreduceInputFormat, key, value, job); @@ -642,7 +661,11 @@ public abstract class ExecutionEnvironment { /** * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}. A * {@link org.apache.hadoop.mapreduce.Job} with the given inputPath is created. + * + * @deprecated Please use {@link org.apache.flink.hadoopcompatibility.HadoopInputs#readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V>, Class<K>, Class<V>, String)} + * from the flink-hadoop-compatibility module. */ + @Deprecated @PublicEvolving public <K,V> DataSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, String inputPath) throws IOException { return readHadoopFile(mapreduceInputFormat, key, value, inputPath, Job.getInstance()); @@ -650,7 +673,11 @@ public abstract class ExecutionEnvironment { /** * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapreduce.InputFormat}. + * + * @deprecated Please use {@link org.apache.flink.hadoopcompatibility.HadoopInputs#createHadoopInput(org.apache.hadoop.mapreduce.InputFormat<K,V>, Class<K>, Class<V>, Job)} + * from the flink-hadoop-compatibility module. */ + @Deprecated @PublicEvolving public <K,V> DataSource<Tuple2<K, V>> createHadoopInput(org.apache.hadoop.mapreduce.InputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, Job job) { org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> hadoopInputFormat = new org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<>(mapreduceInputFormat, key, value, job); http://git-wip-us.apache.org/repos/asf/flink/blob/7d61e1f2/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java b/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java index 8f504e4..8e15441 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java @@ -191,7 +191,10 @@ public class ParameterTool extends ExecutionConfig.GlobalJobParameters implement * @return A {@link ParameterTool} * @throws IOException If arguments cannot be parsed by {@link GenericOptionsParser} * @see GenericOptionsParser + * @deprecated Please use {@link org.apache.flink.hadoopcompatibility.HadoopUtils#paramsFromGenericOptionsParser(String[])} + * from project flink-hadoop-compatibility */ + @Deprecated @PublicEvolving public static ParameterTool fromGenericOptionsParser(String[] args) throws IOException { Option[] options = new GenericOptionsParser(args).getCommandLine().getOptions(); http://git-wip-us.apache.org/repos/asf/flink/blob/7d61e1f2/flink-java/src/test/java/org/apache/flink/api/java/utils/AbstractParameterToolTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/utils/AbstractParameterToolTest.java b/flink-java/src/test/java/org/apache/flink/api/java/utils/AbstractParameterToolTest.java new file mode 100644 index 0000000..9aa9a95 --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/utils/AbstractParameterToolTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.utils; + +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.configuration.Configuration; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; + +import java.io.FileInputStream; +import java.io.IOException; +import java.util.Properties; + +public abstract class AbstractParameterToolTest { + + @Rule + public TemporaryFolder tmp = new TemporaryFolder(); + + protected void validate(ParameterTool parameter) { + ClosureCleaner.ensureSerializable(parameter); + Assert.assertEquals("myInput", parameter.getRequired("input")); + Assert.assertEquals("myDefaultValue", parameter.get("output", "myDefaultValue")); + Assert.assertEquals(null, parameter.get("whatever")); + Assert.assertEquals(15L, parameter.getLong("expectedCount", -1L)); + Assert.assertTrue(parameter.getBoolean("thisIsUseful", true)); + Assert.assertEquals(42, parameter.getByte("myDefaultByte", (byte) 42)); + Assert.assertEquals(42, parameter.getShort("myDefaultShort", (short) 42)); + + Configuration config = parameter.getConfiguration(); + Assert.assertEquals(15L, config.getLong("expectedCount", -1L)); + + Properties props = parameter.getProperties(); + Assert.assertEquals("myInput", props.getProperty("input")); + props = null; + + // -------- test the default file creation ------------ + try { + String pathToFile = tmp.newFile().getAbsolutePath(); + parameter.createPropertiesFile(pathToFile); + Properties defaultProps = new Properties(); + try (FileInputStream fis = new FileInputStream(pathToFile)) { + defaultProps.load(fis); + } + + Assert.assertEquals("myDefaultValue", defaultProps.get("output")); + Assert.assertEquals("-1", defaultProps.get("expectedCount")); + Assert.assertTrue(defaultProps.containsKey("input")); + + } catch (IOException e) { + Assert.fail(e.getMessage()); + e.printStackTrace(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7d61e1f2/flink-java/src/test/java/org/apache/flink/api/java/utils/ParameterToolTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/utils/ParameterToolTest.java b/flink-java/src/test/java/org/apache/flink/api/java/utils/ParameterToolTest.java index 605f033..9b63985 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/utils/ParameterToolTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/utils/ParameterToolTest.java @@ -18,25 +18,17 @@ package org.apache.flink.api.java.utils; -import org.apache.flink.api.java.ClosureCleaner; -import org.apache.flink.configuration.Configuration; import org.junit.Assert; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.TemporaryFolder; import java.io.File; -import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; import java.util.Map; import java.util.Properties; -public class ParameterToolTest { - - @Rule - public TemporaryFolder tmp = new TemporaryFolder(); +public class ParameterToolTest extends AbstractParameterToolTest { // ----- Parser tests ----------------- @@ -162,40 +154,4 @@ public class ParameterToolTest { ParameterTool parameter = ParameterTool.fromGenericOptionsParser(new String[]{"-D", "input=myInput", "-DexpectedCount=15"}); validate(parameter); } - - private void validate(ParameterTool parameter) { - ClosureCleaner.ensureSerializable(parameter); - Assert.assertEquals("myInput", parameter.getRequired("input")); - Assert.assertEquals("myDefaultValue", parameter.get("output", "myDefaultValue")); - Assert.assertEquals(null, parameter.get("whatever")); - Assert.assertEquals(15L, parameter.getLong("expectedCount", -1L)); - Assert.assertTrue(parameter.getBoolean("thisIsUseful", true)); - Assert.assertEquals(42, parameter.getByte("myDefaultByte", (byte) 42)); - Assert.assertEquals(42, parameter.getShort("myDefaultShort", (short) 42)); - - Configuration config = parameter.getConfiguration(); - Assert.assertEquals(15L, config.getLong("expectedCount", -1L)); - - Properties props = parameter.getProperties(); - Assert.assertEquals("myInput", props.getProperty("input")); - props = null; - - // -------- test the default file creation ------------ - try { - String pathToFile = tmp.newFile().getAbsolutePath(); - parameter.createPropertiesFile(pathToFile); - Properties defaultProps = new Properties(); - try (FileInputStream fis = new FileInputStream(pathToFile)) { - defaultProps.load(fis); - } - - Assert.assertEquals("myDefaultValue", defaultProps.get("output")); - Assert.assertEquals("-1", defaultProps.get("expectedCount")); - Assert.assertTrue(defaultProps.containsKey("input")); - - } catch (IOException e) { - Assert.fail(e.getMessage()); - e.printStackTrace(); - } - } } http://git-wip-us.apache.org/repos/asf/flink/blob/7d61e1f2/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala index 4f9d569..18aab07 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala @@ -43,7 +43,7 @@ import scala.collection.JavaConverters._ import scala.reflect.ClassTag /** - * The ExecutionEnviroment is the context in which a program is executed. A local environment will + * The ExecutionEnvironment is the context in which a program is executed. A local environment will * cause execution in the current JVM, a remote environment will cause execution on a remote * cluster installation. * @@ -412,7 +412,12 @@ class ExecutionEnvironment(javaEnv: JavaEnv) { /** * Creates a [[DataSet]] from the given [[org.apache.hadoop.mapred.FileInputFormat]]. The * given inputName is set on the given job. + * + * @deprecated Please use + * [[org.apache.flink.hadoopcompatibility.scala.HadoopInputs#readHadoopFile]] + * from the flink-hadoop-compatibility module. */ + @Deprecated @PublicEvolving def readHadoopFile[K, V]( mapredInputFormat: MapredFileInputFormat[K, V], @@ -429,7 +434,12 @@ class ExecutionEnvironment(javaEnv: JavaEnv) { /** * Creates a [[DataSet]] from the given [[org.apache.hadoop.mapred.FileInputFormat]]. A * [[org.apache.hadoop.mapred.JobConf]] with the given inputPath is created. + * + * @deprecated Please use + * [[org.apache.flink.hadoopcompatibility.scala.HadoopInputs#readHadoopFile]] + * from the flink-hadoop-compatibility module. */ + @Deprecated @PublicEvolving def readHadoopFile[K, V]( mapredInputFormat: MapredFileInputFormat[K, V], @@ -443,7 +453,12 @@ class ExecutionEnvironment(javaEnv: JavaEnv) { /** * Creates a [[DataSet]] from [[org.apache.hadoop.mapred.SequenceFileInputFormat]] * A [[org.apache.hadoop.mapred.JobConf]] with the given inputPath is created. + * + * @deprecated Please use + * [[org.apache.flink.hadoopcompatibility.scala.HadoopInputs#readSequenceFile]] + * from the flink-hadoop-compatibility module. */ + @Deprecated @PublicEvolving def readSequenceFile[K, V]( key: Class[K], @@ -456,7 +471,12 @@ class ExecutionEnvironment(javaEnv: JavaEnv) { /** * Creates a [[DataSet]] from the given [[org.apache.hadoop.mapred.InputFormat]]. + * + * @deprecated Please use + * [[org.apache.flink.hadoopcompatibility.scala.HadoopInputs#createHadoopInput]] + * from the flink-hadoop-compatibility module. */ + @Deprecated @PublicEvolving def createHadoopInput[K, V]( mapredInputFormat: MapredInputFormat[K, V], @@ -471,7 +491,12 @@ class ExecutionEnvironment(javaEnv: JavaEnv) { /** * Creates a [[DataSet]] from the given [[org.apache.hadoop.mapreduce.lib.input.FileInputFormat]]. * The given inputName is set on the given job. + * + * @deprecated Please use + * [[org.apache.flink.hadoopcompatibility.scala.HadoopInputs#readHadoopFile]] + * from the flink-hadoop-compatibility module. */ + @Deprecated @PublicEvolving def readHadoopFile[K, V]( mapreduceInputFormat: MapreduceFileInputFormat[K, V], @@ -489,7 +514,12 @@ class ExecutionEnvironment(javaEnv: JavaEnv) { * Creates a [[DataSet]] from the given * [[org.apache.hadoop.mapreduce.lib.input.FileInputFormat]]. A * [[org.apache.hadoop.mapreduce.Job]] with the given inputPath will be created. + * + * @deprecated Please use + * [[org.apache.flink.hadoopcompatibility.scala.HadoopInputs#readHadoopFile]] + * from the flink-hadoop-compatibility module. */ + @Deprecated @PublicEvolving def readHadoopFile[K, V]( mapreduceInputFormat: MapreduceFileInputFormat[K, V], @@ -502,7 +532,12 @@ class ExecutionEnvironment(javaEnv: JavaEnv) { /** * Creates a [[DataSet]] from the given [[org.apache.hadoop.mapreduce.InputFormat]]. + * + * @deprecated Please use + * [[org.apache.flink.hadoopcompatibility.scala.HadoopInputs#createHadoopInput]] + * from the flink-hadoop-compatibility module. */ + @Deprecated @PublicEvolving def createHadoopInput[K, V]( mapreduceInputFormat: MapreduceInputFormat[K, V], http://git-wip-us.apache.org/repos/asf/flink/blob/7d61e1f2/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java index a0e3468..80f311a 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java @@ -24,6 +24,7 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat; import org.apache.flink.api.java.tuple.Tuple2; +import static org.apache.flink.hadoopcompatibility.HadoopInputs.readHadoopFile; import org.apache.flink.test.testdata.WordCountData; import org.apache.flink.test.util.JavaProgramTestBase; import org.apache.flink.util.Collector; @@ -52,11 +53,24 @@ public class WordCountMapredITCase extends JavaProgramTestBase { @Override protected void testProgram() throws Exception { + internalRun(true); + postSubmit(); + resultPath = getTempDirPath("result2"); + internalRun(false); + } + + private void internalRun(boolean isTestDeprecatedAPI) throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple2<LongWritable, Text>> input; - DataSet<Tuple2<LongWritable, Text>> input = env.readHadoopFile(new TextInputFormat(), + if (isTestDeprecatedAPI) { + input = env.readHadoopFile(new TextInputFormat(), LongWritable.class, Text.class, textPath); + } else { + input = env.createInput(readHadoopFile(new TextInputFormat(), + LongWritable.class, Text.class, textPath)); + } DataSet<String> text = input.map(new MapFunction<Tuple2<LongWritable, Text>, String>() { @Override http://git-wip-us.apache.org/repos/asf/flink/blob/7d61e1f2/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java index fee49bf..3293770 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java @@ -24,6 +24,7 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat; import org.apache.flink.api.java.tuple.Tuple2; +import static org.apache.flink.hadoopcompatibility.HadoopInputs.readHadoopFile; import org.apache.flink.test.testdata.WordCountData; import org.apache.flink.test.util.JavaProgramTestBase; import org.apache.flink.util.Collector; @@ -52,11 +53,23 @@ public class WordCountMapreduceITCase extends JavaProgramTestBase { @Override protected void testProgram() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + internalRun(true); + postSubmit(); + resultPath = getTempDirPath("result2"); + internalRun(false); + } + private void internalRun(boolean isTestDeprecatedAPI) throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet<Tuple2<LongWritable, Text>> input = env.readHadoopFile(new TextInputFormat(), + DataSet<Tuple2<LongWritable, Text>> input; + if (isTestDeprecatedAPI) { + input = env.readHadoopFile(new TextInputFormat(), LongWritable.class, Text.class, textPath); + } else { + input = env.createInput(readHadoopFile(new TextInputFormat(), + LongWritable.class, Text.class, textPath)); + } DataSet<String> text = input.map(new MapFunction<Tuple2<LongWritable, Text>, String>() { @Override http://git-wip-us.apache.org/repos/asf/flink/blob/7d61e1f2/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala index b09ecc4..6b414d6 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala @@ -18,12 +18,12 @@ package org.apache.flink.api.scala.hadoop.mapred import org.apache.flink.api.scala._ - +import org.apache.flink.hadoopcompatibility.scala.HadoopInputs import org.apache.flink.test.testdata.WordCountData -import org.apache.flink.test.util.{TestBaseUtils, JavaProgramTestBase} +import org.apache.flink.test.util.{JavaProgramTestBase, TestBaseUtils} import org.apache.hadoop.fs.Path -import org.apache.hadoop.io.{Text, LongWritable} -import org.apache.hadoop.mapred.{FileOutputFormat, JobConf, TextOutputFormat, TextInputFormat} +import org.apache.hadoop.io.{LongWritable, Text} +import org.apache.hadoop.mapred.{FileOutputFormat, JobConf, TextInputFormat, TextOutputFormat} class WordCountMapredITCase extends JavaProgramTestBase { protected var textPath: String = null @@ -39,21 +39,27 @@ class WordCountMapredITCase extends JavaProgramTestBase { resultPath, Array[String](".", "_")) } - protected def testProgram() { + private def internalRun (testDeprecatedAPI: Boolean): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val input = - env.readHadoopFile(new TextInputFormat, classOf[LongWritable], classOf[Text], textPath) + if (testDeprecatedAPI) { + env.readHadoopFile(new TextInputFormat, classOf[LongWritable], classOf[Text], textPath) + } else { + env.createInput(HadoopInputs.readHadoopFile(new TextInputFormat, classOf[LongWritable], + classOf[Text], textPath)) + } - val text = input map { _._2.toString } - val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } - .map { (_, 1) } + val counts = input + .map(_._2.toString) + .flatMap(_.toLowerCase.split("\\W+").filter(_.nonEmpty).map( (_, 1))) .groupBy(0) .sum(1) - val words = counts map { t => (new Text(t._1), new LongWritable(t._2)) } + val words = counts + .map( t => (new Text(t._1), new LongWritable(t._2)) ) - val hadoopOutputFormat = new HadoopOutputFormat[Text,LongWritable]( + val hadoopOutputFormat = new HadoopOutputFormat[Text, LongWritable]( new TextOutputFormat[Text, LongWritable], new JobConf) hadoopOutputFormat.getJobConf.set("mapred.textoutputformat.separator", " ") @@ -64,5 +70,12 @@ class WordCountMapredITCase extends JavaProgramTestBase { env.execute("Hadoop Compat WordCount") } + + protected def testProgram() { + internalRun(testDeprecatedAPI = true) + postSubmit() + resultPath = getTempDirPath("result2") + internalRun(testDeprecatedAPI = false) + } } http://git-wip-us.apache.org/repos/asf/flink/blob/7d61e1f2/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala index de2d376..e393d23 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala @@ -19,6 +19,7 @@ package org.apache.flink.api.scala.hadoop.mapreduce import org.apache.flink.api.scala._ +import org.apache.flink.hadoopcompatibility.scala.HadoopInputs import org.apache.flink.test.testdata.WordCountData import org.apache.flink.test.util.{TestBaseUtils, JavaProgramTestBase} import org.apache.hadoop.fs.Path @@ -42,21 +43,34 @@ class WordCountMapreduceITCase extends JavaProgramTestBase { } protected def testProgram() { + internalRun(testDeprecatedAPI = true) + postSubmit() + resultPath = getTempDirPath("result2") + internalRun(testDeprecatedAPI = false) + } + + private def internalRun (testDeprecatedAPI: Boolean): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val input = - env.readHadoopFile(new TextInputFormat, classOf[LongWritable], classOf[Text], textPath) + if (testDeprecatedAPI) { + env.readHadoopFile(new TextInputFormat, classOf[LongWritable], classOf[Text], textPath) + } else { + env.createInput(HadoopInputs.readHadoopFile(new TextInputFormat, classOf[LongWritable], + classOf[Text], textPath)) + } - val text = input map { _._2.toString } - val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } - .map { (_, 1) } + val counts = input + .map(_._2.toString) + .flatMap(_.toLowerCase.split("\\W+").filter(_.nonEmpty).map( (_, 1))) .groupBy(0) .sum(1) - val words = counts map { t => (new Text(t._1), new LongWritable(t._2)) } + val words = counts + .map( t => (new Text(t._1), new LongWritable(t._2)) ) val job = Job.getInstance() - val hadoopOutputFormat = new HadoopOutputFormat[Text,LongWritable]( + val hadoopOutputFormat = new HadoopOutputFormat[Text, LongWritable]( new TextOutputFormat[Text, LongWritable], job) hadoopOutputFormat.getConfiguration.set("mapred.textoutputformat.separator", " ")
