http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapFunctionITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapFunctionITCase.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapFunctionITCase.java new file mode 100644 index 0000000..4d1acb4 --- /dev/null +++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapFunctionITCase.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.hadoopcompatibility.mapred; + +import java.io.IOException; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class HadoopMapFunctionITCase extends MultipleProgramsTestBase { + + public HadoopMapFunctionITCase(TestExecutionMode mode){ + super(mode); + } + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Test + public void testNonPassingMapper() throws Exception{ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env); + DataSet<Tuple2<IntWritable, Text>> nonPassingFlatMapDs = ds. + flatMap(new HadoopMapFunction<IntWritable, Text, IntWritable, Text>(new NonPassingMapper())); + + String resultPath = tempFolder.newFile().toURI().toString(); + + nonPassingFlatMapDs.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + env.execute(); + + compareResultsByLinesInMemory("\n", resultPath); + } + + @Test + public void testDataDuplicatingMapper() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env); + DataSet<Tuple2<IntWritable, Text>> duplicatingFlatMapDs = ds. + flatMap(new HadoopMapFunction<IntWritable, Text, IntWritable, Text>(new DuplicatingMapper())); + + String resultPath = tempFolder.newFile().toURI().toString(); + + duplicatingFlatMapDs.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + env.execute(); + + String expected = "(1,Hi)\n" + "(1,HI)\n" + + "(2,Hello)\n" + "(2,HELLO)\n" + + "(3,Hello world)\n" + "(3,HELLO WORLD)\n" + + "(4,Hello world, how are you?)\n" + "(4,HELLO WORLD, HOW ARE YOU?)\n" + + "(5,I am fine.)\n" + "(5,I AM FINE.)\n" + + "(6,Luke Skywalker)\n" + "(6,LUKE SKYWALKER)\n" + + "(7,Comment#1)\n" + "(7,COMMENT#1)\n" + + "(8,Comment#2)\n" + "(8,COMMENT#2)\n" + + "(9,Comment#3)\n" + "(9,COMMENT#3)\n" + + "(10,Comment#4)\n" + "(10,COMMENT#4)\n" + + "(11,Comment#5)\n" + "(11,COMMENT#5)\n" + + "(12,Comment#6)\n" + "(12,COMMENT#6)\n" + + "(13,Comment#7)\n" + "(13,COMMENT#7)\n" + + "(14,Comment#8)\n" + "(14,COMMENT#8)\n" + + "(15,Comment#9)\n" + "(15,COMMENT#9)\n" + + "(16,Comment#10)\n" + "(16,COMMENT#10)\n" + + "(17,Comment#11)\n" + "(17,COMMENT#11)\n" + + "(18,Comment#12)\n" + "(18,COMMENT#12)\n" + + "(19,Comment#13)\n" + "(19,COMMENT#13)\n" + + "(20,Comment#14)\n" + "(20,COMMENT#14)\n" + + "(21,Comment#15)\n" + "(21,COMMENT#15)\n"; + + compareResultsByLinesInMemory(expected, resultPath); + } + + @Test + public void testConfigurableMapper() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + JobConf conf = new JobConf(); + conf.set("my.filterPrefix", "Hello"); + + DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env); + DataSet<Tuple2<IntWritable, Text>> hellos = ds. + flatMap(new HadoopMapFunction<IntWritable, Text, IntWritable, Text>(new ConfigurableMapper(), conf)); + + String resultPath = tempFolder.newFile().toURI().toString(); + + hellos.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + env.execute(); + + String expected = "(2,Hello)\n" + + "(3,Hello world)\n" + + "(4,Hello world, how are you?)\n"; + + compareResultsByLinesInMemory(expected, resultPath); + } + + + + public static class NonPassingMapper implements Mapper<IntWritable, Text, IntWritable, Text> { + + @Override + public void map(final IntWritable k, final Text v, + final OutputCollector<IntWritable, Text> out, final Reporter r) throws IOException { + if ( v.toString().contains("bananas") ) { + out.collect(k,v); + } + } + + @Override + public void configure(final JobConf arg0) { } + + @Override + public void close() throws IOException { } + } + + public static class DuplicatingMapper implements Mapper<IntWritable, Text, IntWritable, Text> { + + @Override + public void map(final IntWritable k, final Text v, + final OutputCollector<IntWritable, Text> out, final Reporter r) throws IOException { + out.collect(k, v); + out.collect(k, new Text(v.toString().toUpperCase())); + } + + @Override + public void configure(final JobConf arg0) { } + + @Override + public void close() throws IOException { } + } + + public static class ConfigurableMapper implements Mapper<IntWritable, Text, IntWritable, Text> { + private String filterPrefix; + + @Override + public void map(IntWritable k, Text v, OutputCollector<IntWritable, Text> out, Reporter r) + throws IOException { + if(v.toString().startsWith(filterPrefix)) { + out.collect(k, v); + } + } + + @Override + public void configure(JobConf c) { + filterPrefix = c.get("my.filterPrefix"); + } + + @Override + public void close() throws IOException { } + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java new file mode 100644 index 0000000..ccc0d82 --- /dev/null +++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.hadoopcompatibility.mapred; + +import org.apache.flink.test.hadoopcompatibility.mapred.example.HadoopMapredCompatWordCount; +import org.apache.flink.test.testdata.WordCountData; +import org.apache.flink.test.util.JavaProgramTestBase; + +public class HadoopMapredITCase extends JavaProgramTestBase { + + protected String textPath; + protected String resultPath; + + @Override + protected void preSubmit() throws Exception { + textPath = createTempFile("text.txt", WordCountData.TEXT); + resultPath = getTempDirPath("result"); + this.setParallelism(4); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath, new String[]{".", "_"}); + } + + @Override + protected void testProgram() throws Exception { + HadoopMapredCompatWordCount.main(new String[] { textPath, resultPath }); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceCombineFunctionITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceCombineFunctionITCase.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceCombineFunctionITCase.java new file mode 100644 index 0000000..13d971c --- /dev/null +++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceCombineFunctionITCase.java @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.hadoopcompatibility.mapred; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction; +import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.hamcrest.core.IsEqual; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class HadoopReduceCombineFunctionITCase extends MultipleProgramsTestBase { + + public HadoopReduceCombineFunctionITCase(TestExecutionMode mode){ + super(mode); + } + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Test + public void testStandardCountingWithCombiner() throws Exception{ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple2<IntWritable, IntWritable>> ds = HadoopTestData.getKVPairDataSet(env). + map(new Mapper1()); + + DataSet<Tuple2<IntWritable, IntWritable>> counts = ds. + groupBy(0). + reduceGroup(new HadoopReduceCombineFunction<IntWritable, IntWritable, IntWritable, IntWritable>( + new SumReducer(), new SumReducer())); + + String resultPath = tempFolder.newFile().toURI().toString(); + + counts.writeAsText(resultPath); + env.execute(); + + String expected = "(0,5)\n"+ + "(1,6)\n" + + "(2,6)\n" + + "(3,4)\n"; + + compareResultsByLinesInMemory(expected, resultPath); + } + + @Test + public void testUngroupedHadoopReducer() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple2<IntWritable, IntWritable>> ds = HadoopTestData.getKVPairDataSet(env). + map(new Mapper2()); + + DataSet<Tuple2<IntWritable, IntWritable>> sum = ds. + reduceGroup(new HadoopReduceCombineFunction<IntWritable, IntWritable, IntWritable, IntWritable>( + new SumReducer(), new SumReducer())); + + String resultPath = tempFolder.newFile().toURI().toString(); + + sum.writeAsText(resultPath); + env.execute(); + + String expected = "(0,231)\n"; + + compareResultsByLinesInMemory(expected, resultPath); + } + + @Test + public void testCombiner() throws Exception { + org.junit.Assume.assumeThat(mode, new IsEqual<TestExecutionMode>(TestExecutionMode.CLUSTER)); + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple2<IntWritable, IntWritable>> ds = HadoopTestData.getKVPairDataSet(env). + map(new Mapper3()); + + DataSet<Tuple2<IntWritable, IntWritable>> counts = ds. + groupBy(0). + reduceGroup(new HadoopReduceCombineFunction<IntWritable, IntWritable, IntWritable, IntWritable>( + new SumReducer(), new KeyChangingReducer())); + + String resultPath = tempFolder.newFile().toURI().toString(); + + counts.writeAsText(resultPath); + env.execute(); + + String expected = "(0,5)\n"+ + "(1,6)\n" + + "(2,5)\n" + + "(3,5)\n"; + + compareResultsByLinesInMemory(expected, resultPath); + } + + @Test + public void testConfigurationViaJobConf() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + JobConf conf = new JobConf(); + conf.set("my.cntPrefix", "Hello"); + + DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env). + map(new Mapper4()); + + DataSet<Tuple2<IntWritable, IntWritable>> hellos = ds. + groupBy(0). + reduceGroup(new HadoopReduceFunction<IntWritable, Text, IntWritable, IntWritable>( + new ConfigurableCntReducer(), conf)); + + String resultPath = tempFolder.newFile().toURI().toString(); + + hellos.writeAsText(resultPath); + env.execute(); + + // return expected result + String expected = "(0,0)\n"+ + "(1,0)\n" + + "(2,1)\n" + + "(3,1)\n" + + "(4,1)\n"; + + compareResultsByLinesInMemory(expected, resultPath); + } + + public static class SumReducer implements Reducer<IntWritable, IntWritable, IntWritable, IntWritable> { + + @Override + public void reduce(IntWritable k, Iterator<IntWritable> v, OutputCollector<IntWritable, IntWritable> out, Reporter r) + throws IOException { + + int sum = 0; + while(v.hasNext()) { + sum += v.next().get(); + } + out.collect(k, new IntWritable(sum)); + } + + @Override + public void configure(JobConf arg0) { } + + @Override + public void close() throws IOException { } + } + + public static class KeyChangingReducer implements Reducer<IntWritable, IntWritable, IntWritable, IntWritable> { + + @Override + public void reduce(IntWritable k, Iterator<IntWritable> v, OutputCollector<IntWritable, IntWritable> out, Reporter r) + throws IOException { + while(v.hasNext()) { + out.collect(new IntWritable(k.get() % 4), v.next()); + } + } + + @Override + public void configure(JobConf arg0) { } + + @Override + public void close() throws IOException { } + } + + public static class ConfigurableCntReducer implements Reducer<IntWritable, Text, IntWritable, IntWritable> { + private String countPrefix; + + @Override + public void reduce(IntWritable k, Iterator<Text> vs, OutputCollector<IntWritable, IntWritable> out, Reporter r) + throws IOException { + int commentCnt = 0; + while(vs.hasNext()) { + String v = vs.next().toString(); + if(v.startsWith(this.countPrefix)) { + commentCnt++; + } + } + out.collect(k, new IntWritable(commentCnt)); + } + + @Override + public void configure(final JobConf c) { + this.countPrefix = c.get("my.cntPrefix"); + } + + @Override + public void close() throws IOException { } + } + + public static class Mapper1 implements MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable, + IntWritable>> { + private static final long serialVersionUID = 1L; + Tuple2<IntWritable,IntWritable> outT = new Tuple2<IntWritable,IntWritable>(); + @Override + public Tuple2<IntWritable, IntWritable> map(Tuple2<IntWritable, Text> v) + throws Exception { + outT.f0 = new IntWritable(v.f0.get() / 6); + outT.f1 = new IntWritable(1); + return outT; + } + } + + public static class Mapper2 implements MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable, + IntWritable>> { + private static final long serialVersionUID = 1L; + Tuple2<IntWritable,IntWritable> outT = new Tuple2<IntWritable,IntWritable>(); + @Override + public Tuple2<IntWritable, IntWritable> map(Tuple2<IntWritable, Text> v) + throws Exception { + outT.f0 = new IntWritable(0); + outT.f1 = v.f0; + return outT; + } + } + + public static class Mapper3 implements MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable, IntWritable>> { + private static final long serialVersionUID = 1L; + Tuple2<IntWritable,IntWritable> outT = new Tuple2<IntWritable,IntWritable>(); + @Override + public Tuple2<IntWritable, IntWritable> map(Tuple2<IntWritable, Text> v) + throws Exception { + outT.f0 = v.f0; + outT.f1 = new IntWritable(1); + return outT; + } + } + + public static class Mapper4 implements MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable, Text>> { + private static final long serialVersionUID = 1L; + @Override + public Tuple2<IntWritable, Text> map(Tuple2<IntWritable, Text> v) + throws Exception { + v.f0 = new IntWritable(v.f0.get() % 5); + return v; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceFunctionITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceFunctionITCase.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceFunctionITCase.java new file mode 100644 index 0000000..abc0e9c --- /dev/null +++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceFunctionITCase.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.hadoopcompatibility.mapred; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class HadoopReduceFunctionITCase extends MultipleProgramsTestBase { + + public HadoopReduceFunctionITCase(TestExecutionMode mode){ + super(mode); + } + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Test + public void testStandardGrouping() throws Exception{ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env). + map(new Mapper1()); + + DataSet<Tuple2<IntWritable, IntWritable>> commentCnts = ds. + groupBy(0). + reduceGroup(new HadoopReduceFunction<IntWritable, Text, IntWritable, IntWritable>(new CommentCntReducer())); + + String resultPath = tempFolder.newFile().toURI().toString(); + + commentCnts.writeAsText(resultPath); + env.execute(); + + String expected = "(0,0)\n"+ + "(1,3)\n" + + "(2,5)\n" + + "(3,5)\n" + + "(4,2)\n"; + + compareResultsByLinesInMemory(expected, resultPath); + } + + @Test + public void testUngroupedHadoopReducer() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env); + + DataSet<Tuple2<IntWritable, IntWritable>> commentCnts = ds. + reduceGroup(new HadoopReduceFunction<IntWritable, Text, IntWritable, IntWritable>(new AllCommentCntReducer())); + + String resultPath = tempFolder.newFile().toURI().toString(); + + commentCnts.writeAsText(resultPath); + env.execute(); + + String expected = "(42,15)\n"; + + compareResultsByLinesInMemory(expected, resultPath); + } + + @Test + public void testConfigurationViaJobConf() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + JobConf conf = new JobConf(); + conf.set("my.cntPrefix", "Hello"); + + DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env). + map(new Mapper2()); + + DataSet<Tuple2<IntWritable, IntWritable>> helloCnts = ds. + groupBy(0). + reduceGroup(new HadoopReduceFunction<IntWritable, Text, IntWritable, IntWritable>( + new ConfigurableCntReducer(), conf)); + + String resultPath = tempFolder.newFile().toURI().toString(); + + helloCnts.writeAsText(resultPath); + env.execute(); + + String expected = "(0,0)\n"+ + "(1,0)\n" + + "(2,1)\n" + + "(3,1)\n" + + "(4,1)\n"; + + compareResultsByLinesInMemory(expected, resultPath); + } + + public static class CommentCntReducer implements Reducer<IntWritable, Text, IntWritable, IntWritable> { + + @Override + public void reduce(IntWritable k, Iterator<Text> vs, OutputCollector<IntWritable, IntWritable> out, Reporter r) + throws IOException { + int commentCnt = 0; + while(vs.hasNext()) { + String v = vs.next().toString(); + if(v.startsWith("Comment")) { + commentCnt++; + } + } + out.collect(k, new IntWritable(commentCnt)); + } + + @Override + public void configure(final JobConf arg0) { } + + @Override + public void close() throws IOException { } + } + + public static class AllCommentCntReducer implements Reducer<IntWritable, Text, IntWritable, IntWritable> { + + @Override + public void reduce(IntWritable k, Iterator<Text> vs, OutputCollector<IntWritable, IntWritable> out, Reporter r) + throws IOException { + int commentCnt = 0; + while(vs.hasNext()) { + String v = vs.next().toString(); + if(v.startsWith("Comment")) { + commentCnt++; + } + } + out.collect(new IntWritable(42), new IntWritable(commentCnt)); + } + + @Override + public void configure(final JobConf arg0) { } + + @Override + public void close() throws IOException { } + } + + public static class ConfigurableCntReducer implements Reducer<IntWritable, Text, IntWritable, IntWritable> { + private String countPrefix; + + @Override + public void reduce(IntWritable k, Iterator<Text> vs, OutputCollector<IntWritable, IntWritable> out, Reporter r) + throws IOException { + int commentCnt = 0; + while(vs.hasNext()) { + String v = vs.next().toString(); + if(v.startsWith(this.countPrefix)) { + commentCnt++; + } + } + out.collect(k, new IntWritable(commentCnt)); + } + + @Override + public void configure(final JobConf c) { + this.countPrefix = c.get("my.cntPrefix"); + } + + @Override + public void close() throws IOException { } + } + + public static class Mapper1 implements MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable, Text>> { + private static final long serialVersionUID = 1L; + @Override + public Tuple2<IntWritable, Text> map(Tuple2<IntWritable, Text> v) + throws Exception { + v.f0 = new IntWritable(v.f0.get() / 5); + return v; + } + } + + public static class Mapper2 implements MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable, Text>> { + private static final long serialVersionUID = 1L; + @Override + public Tuple2<IntWritable, Text> map(Tuple2<IntWritable, Text> v) + throws Exception { + v.f0 = new IntWritable(v.f0.get() % 5); + return v; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopTestData.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopTestData.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopTestData.java new file mode 100644 index 0000000..eed6f8f --- /dev/null +++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopTestData.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.hadoopcompatibility.mapred; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; + +public class HadoopTestData { + + public static DataSet<Tuple2<IntWritable, Text>> getKVPairDataSet(ExecutionEnvironment env) { + + List<Tuple2<IntWritable, Text>> data = new ArrayList<Tuple2<IntWritable, Text>>(); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(1),new Text("Hi"))); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(2),new Text("Hello"))); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(3),new Text("Hello world"))); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(4),new Text("Hello world, how are you?"))); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(5),new Text("I am fine."))); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(6),new Text("Luke Skywalker"))); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(7),new Text("Comment#1"))); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(8),new Text("Comment#2"))); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(9),new Text("Comment#3"))); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(10),new Text("Comment#4"))); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(11),new Text("Comment#5"))); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(12),new Text("Comment#6"))); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(13),new Text("Comment#7"))); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(14),new Text("Comment#8"))); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(15),new Text("Comment#9"))); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(16),new Text("Comment#10"))); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(17),new Text("Comment#11"))); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(18),new Text("Comment#12"))); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(19),new Text("Comment#13"))); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(20),new Text("Comment#14"))); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(21),new Text("Comment#15"))); + + Collections.shuffle(data); + + return env.fromCollection(data); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java new file mode 100644 index 0000000..ce0143a --- /dev/null +++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.hadoopcompatibility.mapred.example; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat; +import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction; +import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat; +import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.TextInputFormat; +import org.apache.hadoop.mapred.TextOutputFormat; + + + +/** + * Implements a word count which takes the input file and counts the number of + * occurrences of each word in the file and writes the result back to disk. + * + * This example shows how to use Hadoop Input Formats, how to convert Hadoop Writables to + * common Java types for better usage in a Flink job and how to use Hadoop Output Formats. + */ +public class HadoopMapredCompatWordCount { + + public static void main(String[] args) throws Exception { + if (args.length < 2) { + System.err.println("Usage: WordCount <input path> <result path>"); + return; + } + + final String inputPath = args[0]; + final String outputPath = args[1]; + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + // Set up the Hadoop Input Format + HadoopInputFormat<LongWritable, Text> hadoopInputFormat = new HadoopInputFormat<LongWritable, Text>(new TextInputFormat(), LongWritable.class, Text.class, new JobConf()); + TextInputFormat.addInputPath(hadoopInputFormat.getJobConf(), new Path(inputPath)); + + // Create a Flink job with it + DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopInputFormat); + + DataSet<Tuple2<Text, LongWritable>> words = + text.flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(new Tokenizer())) + .groupBy(0).reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(new Counter(), new Counter())); + + // Set up Hadoop Output Format + HadoopOutputFormat<Text, LongWritable> hadoopOutputFormat = + new HadoopOutputFormat<Text, LongWritable>(new TextOutputFormat<Text, LongWritable>(), new JobConf()); + hadoopOutputFormat.getJobConf().set("mapred.textoutputformat.separator", " "); + TextOutputFormat.setOutputPath(hadoopOutputFormat.getJobConf(), new Path(outputPath)); + + // Output & Execute + words.output(hadoopOutputFormat).setParallelism(1); + env.execute("Hadoop Compat WordCount"); + } + + + public static final class Tokenizer implements Mapper<LongWritable, Text, Text, LongWritable> { + + @Override + public void map(LongWritable k, Text v, OutputCollector<Text, LongWritable> out, Reporter rep) + throws IOException { + // normalize and split the line + String line = v.toString(); + String[] tokens = line.toLowerCase().split("\\W+"); + + // emit the pairs + for (String token : tokens) { + if (token.length() > 0) { + out.collect(new Text(token), new LongWritable(1l)); + } + } + } + + @Override + public void configure(JobConf arg0) { } + + @Override + public void close() throws IOException { } + + } + + public static final class Counter implements Reducer<Text, LongWritable, Text, LongWritable> { + + @Override + public void reduce(Text k, Iterator<LongWritable> vs, OutputCollector<Text, LongWritable> out, Reporter rep) + throws IOException { + + long cnt = 0; + while(vs.hasNext()) { + cnt += vs.next().get(); + } + out.collect(k, new LongWritable(cnt)); + + } + + @Override + public void configure(JobConf arg0) { } + + @Override + public void close() throws IOException { } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java new file mode 100644 index 0000000..524318c --- /dev/null +++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.hadoopcompatibility.mapred.wrapper; + +import java.util.ArrayList; +import java.util.NoSuchElementException; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.runtime.WritableSerializer; +import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator; +import org.apache.hadoop.io.IntWritable; +import org.junit.Assert; +import org.junit.Test; + +public class HadoopTupleUnwrappingIteratorTest { + + @Test + public void testValueIterator() { + + HadoopTupleUnwrappingIterator<IntWritable, IntWritable> valIt = + new HadoopTupleUnwrappingIterator<IntWritable, IntWritable>(new WritableSerializer + <IntWritable>(IntWritable.class)); + + // many values + + ArrayList<Tuple2<IntWritable, IntWritable>> tList = new ArrayList<Tuple2<IntWritable, IntWritable>>(); + tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(1))); + tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(2))); + tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(3))); + tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(4))); + tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(5))); + tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(6))); + tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(7))); + tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(8))); + + int expectedKey = 1; + int[] expectedValues = new int[] {1,2,3,4,5,6,7,8}; + + valIt.set(tList.iterator()); + Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey); + for(int expectedValue : expectedValues) { + Assert.assertTrue(valIt.hasNext()); + Assert.assertTrue(valIt.hasNext()); + Assert.assertTrue(valIt.next().get() == expectedValue); + Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey); + } + Assert.assertFalse(valIt.hasNext()); + Assert.assertFalse(valIt.hasNext()); + Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey); + + // one value + + tList.clear(); + tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(2),new IntWritable(10))); + + expectedKey = 2; + expectedValues = new int[]{10}; + + valIt.set(tList.iterator()); + Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey); + for(int expectedValue : expectedValues) { + Assert.assertTrue(valIt.hasNext()); + Assert.assertTrue(valIt.hasNext()); + Assert.assertTrue(valIt.next().get() == expectedValue); + Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey); + } + Assert.assertFalse(valIt.hasNext()); + Assert.assertFalse(valIt.hasNext()); + Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey); + + // more values + + tList.clear(); + tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(3),new IntWritable(10))); + tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(3),new IntWritable(4))); + tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(3),new IntWritable(7))); + tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(3),new IntWritable(9))); + tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4),new IntWritable(21))); + + expectedKey = 3; + expectedValues = new int[]{10,4,7,9,21}; + + valIt.set(tList.iterator()); + Assert.assertTrue(valIt.hasNext()); + Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey); + for(int expectedValue : expectedValues) { + Assert.assertTrue(valIt.hasNext()); + Assert.assertTrue(valIt.hasNext()); + Assert.assertTrue(valIt.next().get() == expectedValue); + Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey); + } + Assert.assertFalse(valIt.hasNext()); + Assert.assertFalse(valIt.hasNext()); + Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey); + + // no has next calls + + tList.clear(); + tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4),new IntWritable(5))); + tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4),new IntWritable(8))); + tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4),new IntWritable(42))); + tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4),new IntWritable(-1))); + tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4),new IntWritable(0))); + + expectedKey = 4; + expectedValues = new int[]{5,8,42,-1,0}; + + valIt.set(tList.iterator()); + Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey); + for(int expectedValue : expectedValues) { + Assert.assertTrue(valIt.next().get() == expectedValue); + } + try { + valIt.next(); + Assert.fail(); + } catch (NoSuchElementException nsee) { + // expected + } + Assert.assertFalse(valIt.hasNext()); + Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java new file mode 100644 index 0000000..698e356 --- /dev/null +++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.hadoopcompatibility.mapreduce; + +import org.apache.flink.test.hadoopcompatibility.mapreduce.example.WordCount; +import org.apache.flink.test.testdata.WordCountData; +import org.apache.flink.test.util.JavaProgramTestBase; + +public class HadoopInputOutputITCase extends JavaProgramTestBase { + + protected String textPath; + protected String resultPath; + + + @Override + protected void preSubmit() throws Exception { + textPath = createTempFile("text.txt", WordCountData.TEXT); + resultPath = getTempDirPath("result"); + this.setParallelism(4); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath, new String[]{".", "_"}); + } + + @Override + protected void testProgram() throws Exception { + WordCount.main(new String[] { textPath, resultPath }); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/example/WordCount.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/example/WordCount.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/example/WordCount.java new file mode 100644 index 0000000..ed83d78 --- /dev/null +++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/example/WordCount.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.hadoopcompatibility.mapreduce.example; + +import org.apache.flink.api.java.aggregation.Aggregations; +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.util.Collector; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat; +import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat; + +/** + * Implements a word count which takes the input file and counts the number of + * occurrences of each word in the file and writes the result back to disk. + * + * This example shows how to use Hadoop Input Formats, how to convert Hadoop Writables to + * common Java types for better usage in a Flink job and how to use Hadoop Output Formats. + */ +@SuppressWarnings("serial") +public class WordCount { + + public static void main(String[] args) throws Exception { + if (args.length < 2) { + System.err.println("Usage: WordCount <input path> <result path>"); + return; + } + + final String inputPath = args[0]; + final String outputPath = args[1]; + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + // Set up the Hadoop Input Format + Job job = Job.getInstance(); + HadoopInputFormat<LongWritable, Text> hadoopInputFormat = new HadoopInputFormat<LongWritable, Text>(new TextInputFormat(), LongWritable.class, Text.class, job); + TextInputFormat.addInputPath(job, new Path(inputPath)); + + // Create a Flink job with it + DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopInputFormat); + + // Tokenize the line and convert from Writable "Text" to String for better handling + DataSet<Tuple2<String, Integer>> words = text.flatMap(new Tokenizer()); + + // Sum up the words + DataSet<Tuple2<String, Integer>> result = words.groupBy(0).aggregate(Aggregations.SUM, 1); + + // Convert String back to Writable "Text" for use with Hadoop Output Format + DataSet<Tuple2<Text, IntWritable>> hadoopResult = result.map(new HadoopDatatypeMapper()); + + // Set up Hadoop Output Format + HadoopOutputFormat<Text, IntWritable> hadoopOutputFormat = new HadoopOutputFormat<Text, IntWritable>(new TextOutputFormat<Text, IntWritable>(), job); + hadoopOutputFormat.getConfiguration().set("mapreduce.output.textoutputformat.separator", " "); + hadoopOutputFormat.getConfiguration().set("mapred.textoutputformat.separator", " "); // set the value for both, since this test + TextOutputFormat.setOutputPath(job, new Path(outputPath)); + + // Output & Execute + hadoopResult.output(hadoopOutputFormat); + env.execute("Word Count"); + } + + /** + * Splits a line into words and converts Hadoop Writables into normal Java data types. + */ + public static final class Tokenizer extends RichFlatMapFunction<Tuple2<LongWritable, Text>, Tuple2<String, Integer>> { + + @Override + public void flatMap(Tuple2<LongWritable, Text> value, Collector<Tuple2<String, Integer>> out) { + // normalize and split the line + String line = value.f1.toString(); + String[] tokens = line.toLowerCase().split("\\W+"); + + // emit the pairs + for (String token : tokens) { + if (token.length() > 0) { + out.collect(new Tuple2<String, Integer>(token, 1)); + } + } + } + } + + /** + * Converts Java data types to Hadoop Writables. + */ + public static final class HadoopDatatypeMapper extends RichMapFunction<Tuple2<String, Integer>, Tuple2<Text, IntWritable>> { + + @Override + public Tuple2<Text, IntWritable> map(Tuple2<String, Integer> value) throws Exception { + return new Tuple2<Text, IntWritable>(new Text(value.f0), new IntWritable(value.f1)); + } + + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/resources/log4j-test.properties b/flink-connectors/flink-hadoop-compatibility/src/test/resources/log4j-test.properties new file mode 100644 index 0000000..0b686e5 --- /dev/null +++ b/flink-connectors/flink-hadoop-compatibility/src/test/resources/log4j-test.properties @@ -0,0 +1,27 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Set root logger level to DEBUG and its only appender to A1. +log4j.rootLogger=OFF, A1 + +# A1 is set to be a ConsoleAppender. +log4j.appender.A1=org.apache.log4j.ConsoleAppender + +# A1 uses PatternLayout. +log4j.appender.A1.layout=org.apache.log4j.PatternLayout +log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/resources/logback-test.xml b/flink-connectors/flink-hadoop-compatibility/src/test/resources/logback-test.xml new file mode 100644 index 0000000..8b3bb27 --- /dev/null +++ b/flink-connectors/flink-hadoop-compatibility/src/test/resources/logback-test.xml @@ -0,0 +1,29 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<configuration> + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern> + </encoder> + </appender> + + <root level="WARN"> + <appender-ref ref="STDOUT"/> + </root> +</configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hbase/pom.xml ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-hbase/pom.xml b/flink-connectors/flink-hbase/pom.xml new file mode 100644 index 0000000..9b1e174 --- /dev/null +++ b/flink-connectors/flink-hbase/pom.xml @@ -0,0 +1,264 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connectors</artifactId> + <version>1.2-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-hbase_2.10</artifactId> + <name>flink-hbase</name> + <packaging>jar</packaging> + + <properties> + <hbase.version>1.2.3</hbase.version> + </properties> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <version>2.19.1</version> + <configuration> + <!-- Enforce single fork execution due to heavy mini cluster use in the tests --> + <forkCount>1</forkCount> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <!-- Disable inherited shade-flink because of a problem in the shade plugin --> + <!-- When enabled you'll run into an infinite loop creating the dependency-reduced-pom.xml --> + <!-- Seems similar to https://issues.apache.org/jira/browse/MSHADE-148 --> + <id>shade-flink</id> + <phase>none</phase> + </execution> + </executions> + </plugin> + </plugins> + </build> + + <dependencies> + + <!-- core dependencies --> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-java</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-shaded-hadoop2</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_2.10</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + + <!--Exclude Guava in order to run the HBaseMiniCluster during testing--> + <exclusions> + <exclusion> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </exclusion> + </exclusions> + </dependency> + + <!-- HBase server needed for TableOutputFormat --> + <!-- TODO implement bulk output format for HBase --> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-server</artifactId> + <version>${hbase.version}</version> + <exclusions> + <!-- Remove unneeded dependency, which is conflicting with our jetty-util version. --> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jetty-util</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jetty</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jetty-sslengine</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jsp-2.1</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jsp-api-2.1</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>servlet-api-2.5</artifactId> + </exclusion> + <!-- The hadoop dependencies are handled through flink-shaded-hadoop --> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-auth</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-annotations</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + </exclusion> + <!-- Bug in hbase annotations, can be removed when fixed. See FLINK-2153. --> + <exclusion> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-annotations</artifactId> + </exclusion> + </exclusions> + </dependency> + + <!-- test dependencies --> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-clients_2.10</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-core</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-hadoop-compatibility_2.10</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>org.apache.flink</groupId> + <artifactId>flink-shaded-include-yarn_2.10</artifactId> + </exclusion> + </exclusions> + </dependency> + + <!-- Test dependencies are only available for Hadoop-2. --> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-server</artifactId> + <version>${hbase.version}</version> + <classifier>tests</classifier> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-minicluster</artifactId> + <version>${hadoop.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-hadoop-compat</artifactId> + <version>${hbase.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <version>${hadoop.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-hadoop2-compat</artifactId> + <version>${hbase.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + </dependencies> + + <profiles> + <profile> + <id>cdh5.1.3</id> + <properties> + <hbase.version>0.98.1-cdh5.1.3</hbase.version> + <hadoop.version>2.3.0-cdh5.1.3</hadoop.version> + <!-- Cloudera use different versions for hadoop core and commons--> + <!-- This profile could be removed if Cloudera fix this mismatch! --> + <hadoop.core.version>2.3.0-mr1-cdh5.1.3</hadoop.core.version> + </properties> + <dependencyManagement> + <dependencies> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-core</artifactId> + <version>${hadoop.core.version}</version> + </dependency> + </dependencies> + </dependencyManagement> + </profile> + + </profiles> + +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java new file mode 100644 index 0000000..35b0a7c --- /dev/null +++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.addons.hbase; + +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.io.LocatableInputSplitAssigner; +import org.apache.flink.api.common.io.RichInputFormat; +import org.apache.flink.api.common.io.statistics.BaseStatistics; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.InputSplitAssigner; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * {@link InputFormat} subclass that wraps the access for HTables. + */ +public abstract class TableInputFormat<T extends Tuple> extends RichInputFormat<T, TableInputSplit> { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(TableInputFormat.class); + + /** helper variable to decide whether the input is exhausted or not */ + private boolean endReached = false; + + protected transient HTable table = null; + protected transient Scan scan = null; + + /** HBase iterator wrapper */ + private ResultScanner resultScanner = null; + + private byte[] lastRow; + private int scannedRows; + + /** + * Returns an instance of Scan that retrieves the required subset of records from the HBase table. + * @return The appropriate instance of Scan for this usecase. + */ + protected abstract Scan getScanner(); + + /** + * What table is to be read. + * Per instance of a TableInputFormat derivative only a single tablename is possible. + * @return The name of the table + */ + protected abstract String getTableName(); + + /** + * The output from HBase is always an instance of {@link Result}. + * This method is to copy the data in the Result instance into the required {@link Tuple} + * @param r The Result instance from HBase that needs to be converted + * @return The approriate instance of {@link Tuple} that contains the needed information. + */ + protected abstract T mapResultToTuple(Result r); + + /** + * Creates a {@link Scan} object and opens the {@link HTable} connection. + * These are opened here because they are needed in the createInputSplits + * which is called before the openInputFormat method. + * So the connection is opened in {@link #configure(Configuration)} and closed in {@link #closeInputFormat()}. + * + * @param parameters The configuration that is to be used + * @see Configuration + */ + @Override + public void configure(Configuration parameters) { + table = createTable(); + if (table != null) { + scan = getScanner(); + } + } + + /** + * Create an {@link HTable} instance and set it into this format + */ + private HTable createTable() { + LOG.info("Initializing HBaseConfiguration"); + //use files found in the classpath + org.apache.hadoop.conf.Configuration hConf = HBaseConfiguration.create(); + + try { + return new HTable(hConf, getTableName()); + } catch (Exception e) { + LOG.error("Error instantiating a new HTable instance", e); + } + return null; + } + + @Override + public void open(TableInputSplit split) throws IOException { + if (table == null) { + throw new IOException("The HBase table has not been opened!"); + } + if (scan == null) { + throw new IOException("getScanner returned null"); + } + if (split == null) { + throw new IOException("Input split is null!"); + } + + logSplitInfo("opening", split); + scan.setStartRow(split.getStartRow()); + lastRow = split.getEndRow(); + scan.setStopRow(lastRow); + + resultScanner = table.getScanner(scan); + endReached = false; + scannedRows = 0; + } + + @Override + public boolean reachedEnd() throws IOException { + return endReached; + } + + @Override + public T nextRecord(T reuse) throws IOException { + if (resultScanner == null) { + throw new IOException("No table result scanner provided!"); + } + try { + Result res = resultScanner.next(); + if (res != null) { + scannedRows++; + lastRow = res.getRow(); + return mapResultToTuple(res); + } + } catch (Exception e) { + resultScanner.close(); + //workaround for timeout on scan + LOG.warn("Error after scan of " + scannedRows + " rows. Retry with a new scanner...", e); + scan.setStartRow(lastRow); + resultScanner = table.getScanner(scan); + Result res = resultScanner.next(); + if (res != null) { + scannedRows++; + lastRow = res.getRow(); + return mapResultToTuple(res); + } + } + + endReached = true; + return null; + } + + @Override + public void close() throws IOException { + LOG.info("Closing split (scanned {} rows)", scannedRows); + lastRow = null; + try { + if (resultScanner != null) { + resultScanner.close(); + } + } finally { + resultScanner = null; + } + } + + @Override + public void closeInputFormat() throws IOException { + try { + if (table != null) { + table.close(); + } + } finally { + table = null; + } + } + + @Override + public TableInputSplit[] createInputSplits(final int minNumSplits) throws IOException { + if (table == null) { + throw new IOException("The HBase table has not been opened!"); + } + if (scan == null) { + throw new IOException("getScanner returned null"); + } + + //Gets the starting and ending row keys for every region in the currently open table + final Pair<byte[][], byte[][]> keys = table.getStartEndKeys(); + if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) { + throw new IOException("Expecting at least one region."); + } + final byte[] startRow = scan.getStartRow(); + final byte[] stopRow = scan.getStopRow(); + final boolean scanWithNoLowerBound = startRow.length == 0; + final boolean scanWithNoUpperBound = stopRow.length == 0; + + final List<TableInputSplit> splits = new ArrayList<TableInputSplit>(minNumSplits); + for (int i = 0; i < keys.getFirst().length; i++) { + final byte[] startKey = keys.getFirst()[i]; + final byte[] endKey = keys.getSecond()[i]; + final String regionLocation = table.getRegionLocation(startKey, false).getHostnamePort(); + //Test if the given region is to be included in the InputSplit while splitting the regions of a table + if (!includeRegionInSplit(startKey, endKey)) { + continue; + } + //Finds the region on which the given row is being served + final String[] hosts = new String[]{regionLocation}; + + // determine if regions contains keys used by the scan + boolean isLastRegion = endKey.length == 0; + if ((scanWithNoLowerBound || isLastRegion || Bytes.compareTo(startRow, endKey) < 0) && + (scanWithNoUpperBound || Bytes.compareTo(stopRow, startKey) > 0)) { + + final byte[] splitStart = scanWithNoLowerBound || Bytes.compareTo(startKey, startRow) >= 0 ? startKey : startRow; + final byte[] splitStop = (scanWithNoUpperBound || Bytes.compareTo(endKey, stopRow) <= 0) + && !isLastRegion ? endKey : stopRow; + int id = splits.size(); + final TableInputSplit split = new TableInputSplit(id, hosts, table.getTableName(), splitStart, splitStop); + splits.add(split); + } + } + LOG.info("Created " + splits.size() + " splits"); + for (TableInputSplit split : splits) { + logSplitInfo("created", split); + } + return splits.toArray(new TableInputSplit[0]); + } + + private void logSplitInfo(String action, TableInputSplit split) { + int splitId = split.getSplitNumber(); + String splitStart = Bytes.toString(split.getStartRow()); + String splitEnd = Bytes.toString(split.getEndRow()); + String splitStartKey = splitStart.isEmpty() ? "-" : splitStart; + String splitStopKey = splitEnd.isEmpty() ? "-" : splitEnd; + String[] hostnames = split.getHostnames(); + LOG.info("{} split (this={})[{}|{}|{}|{}]", action, this, splitId, hostnames, splitStartKey, splitStopKey); + } + + /** + * Test if the given region is to be included in the InputSplit while splitting the regions of a table. + * <p> + * This optimization is effective when there is a specific reasoning to exclude an entire region from the M-R job, + * (and hence, not contributing to the InputSplit), given the start and end keys of the same. <br> + * Useful when we need to remember the last-processed top record and revisit the [last, current) interval for M-R + * processing, continuously. In addition to reducing InputSplits, reduces the load on the region server as well, due + * to the ordering of the keys. <br> + * <br> + * Note: It is possible that <code>endKey.length() == 0 </code> , for the last (recent) region. <br> + * Override this method, if you want to bulk exclude regions altogether from M-R. By default, no region is excluded( + * i.e. all regions are included). + * + * @param startKey Start key of the region + * @param endKey End key of the region + * @return true, if this region needs to be included as part of the input (default). + */ + protected boolean includeRegionInSplit(final byte[] startKey, final byte[] endKey) { + return true; + } + + @Override + public InputSplitAssigner getInputSplitAssigner(TableInputSplit[] inputSplits) { + return new LocatableInputSplitAssigner(inputSplits); + } + + @Override + public BaseStatistics getStatistics(BaseStatistics cachedStatistics) { + return null; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java new file mode 100644 index 0000000..75f0b9b --- /dev/null +++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.addons.hbase; + +import org.apache.flink.core.io.LocatableInputSplit; + +/** + * This class implements a input splits for HBase. Each table input split corresponds to a key range (low, high). All + * references to row below refer to the key of the row. + */ +public class TableInputSplit extends LocatableInputSplit { + + private static final long serialVersionUID = 1L; + + /** The name of the table to retrieve data from */ + private final byte[] tableName; + + /** The start row of the split. */ + private final byte[] startRow; + + /** The end row of the split. */ + private final byte[] endRow; + + /** + * Creates a new table input split + * + * @param splitNumber + * the number of the input split + * @param hostnames + * the names of the hosts storing the data the input split refers to + * @param tableName + * the name of the table to retrieve data from + * @param startRow + * the start row of the split + * @param endRow + * the end row of the split + */ + TableInputSplit(final int splitNumber, final String[] hostnames, final byte[] tableName, final byte[] startRow, + final byte[] endRow) { + super(splitNumber, hostnames); + + this.tableName = tableName; + this.startRow = startRow; + this.endRow = endRow; + } + + /** + * Returns the table name. + * + * @return The table name. + */ + public byte[] getTableName() { + return this.tableName; + } + + /** + * Returns the start row. + * + * @return The start row. + */ + public byte[] getStartRow() { + return this.startRow; + } + + /** + * Returns the end row. + * + * @return The end row. + */ + public byte[] getEndRow() { + return this.endRow; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java new file mode 100644 index 0000000..3d9f672 --- /dev/null +++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java @@ -0,0 +1,238 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.addons.hbase; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MasterNotRunningException; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.ScannerCallable; +import org.apache.hadoop.hbase.ipc.AbstractRpcClient; +import org.apache.hadoop.hbase.ipc.RpcServer; +import org.apache.log4j.Level; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.io.Serializable; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLClassLoader; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * By using this class as the super class of a set of tests you will have a HBase testing + * cluster available that is very suitable for writing tests for scanning and filtering against. + * This is usable by any downstream application because the HBase cluster is 'injected' because + * a dynamically generated hbase-site.xml is added to the classpath. + * Because of this classpath manipulation it is not possible to start a second testing cluster in the same JVM. + * So if you have this you should either put all hbase related tests in a single class or force surefire to + * setup a new JVM for each testclass. + * See: http://maven.apache.org/surefire/maven-surefire-plugin/examples/fork-options-and-parallel-execution.html + */ +// +// NOTE: The code in this file is based on code from the +// Apache HBase project, licensed under the Apache License v 2.0 +// +// https://github.com/apache/hbase/blob/master/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/FilterTestingCluster.java +// +public class HBaseTestingClusterAutostarter implements Serializable { + + private static final Log LOG = LogFactory.getLog(HBaseTestingClusterAutostarter.class); + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static HBaseAdmin admin = null; + private static List<TableName> createdTables = new ArrayList<>(); + + private static boolean alreadyRegisteredTestCluster = false; + + protected static void createTable(TableName tableName, byte[] columnFamilyName, byte[][] splitKeys) { + LOG.info("HBase minicluster: Creating table " + tableName.getNameAsString()); + + assertNotNull("HBaseAdmin is not initialized successfully.", admin); + HTableDescriptor desc = new HTableDescriptor(tableName); + HColumnDescriptor colDef = new HColumnDescriptor(columnFamilyName); + desc.addFamily(colDef); + + try { + admin.createTable(desc, splitKeys); + createdTables.add(tableName); + assertTrue("Fail to create the table", admin.tableExists(tableName)); + } catch (IOException e) { + assertNull("Exception found while creating table", e); + } + } + + protected static HTable openTable(TableName tableName) throws IOException { + HTable table = (HTable) admin.getConnection().getTable(tableName); + assertTrue("Fail to create the table", admin.tableExists(tableName)); + return table; + } + + private static void deleteTables() { + if (admin != null) { + for (TableName tableName : createdTables) { + try { + if (admin.tableExists(tableName)) { + admin.disableTable(tableName); + admin.deleteTable(tableName); + } + } catch (IOException e) { + assertNull("Exception found deleting the table", e); + } + } + } + } + + private static void initialize(Configuration conf) { + conf = HBaseConfiguration.create(conf); + conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); + try { + admin = TEST_UTIL.getHBaseAdmin(); + } catch (MasterNotRunningException e) { + assertNull("Master is not running", e); + } catch (ZooKeeperConnectionException e) { + assertNull("Cannot connect to ZooKeeper", e); + } catch (IOException e) { + assertNull("IOException", e); + } + } + + @BeforeClass + public static void setUp() throws Exception { + LOG.info("HBase minicluster: Starting"); + ((Log4JLogger) RpcServer.LOG).getLogger().setLevel(Level.ALL); + ((Log4JLogger) AbstractRpcClient.LOG).getLogger().setLevel(Level.ALL); + ((Log4JLogger) ScannerCallable.LOG).getLogger().setLevel(Level.ALL); + + TEST_UTIL.startMiniCluster(1); + + // https://issues.apache.org/jira/browse/HBASE-11711 + TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", -1); + + // Make sure the zookeeper quorum value contains the right port number (varies per run). + TEST_UTIL.getConfiguration().set("hbase.zookeeper.quorum", "localhost:" + TEST_UTIL.getZkCluster().getClientPort()); + + initialize(TEST_UTIL.getConfiguration()); + LOG.info("HBase minicluster: Running"); + } + + private static File hbaseSiteXmlDirectory; + private static File hbaseSiteXmlFile; + + /** + * This dynamically generates a hbase-site.xml file that is added to the classpath. + * This way this HBaseMinicluster can be used by an unmodified application. + * The downside is that this cannot be 'unloaded' so you can have only one per JVM. + */ + public static void registerHBaseMiniClusterInClasspath() { + if (alreadyRegisteredTestCluster) { + fail("You CANNOT register a second HBase Testing cluster in the classpath of the SAME JVM"); + } + File baseDir = new File(System.getProperty("java.io.tmpdir", "/tmp/")); + hbaseSiteXmlDirectory = new File(baseDir, "unittest-hbase-minicluster-" + Math.abs(new Random().nextLong()) + "/"); + + if (!hbaseSiteXmlDirectory.mkdirs()) { + fail("Unable to create output directory " + hbaseSiteXmlDirectory + " for the HBase minicluster"); + } + + assertNotNull("The ZooKeeper for the HBase minicluster is missing", TEST_UTIL.getZkCluster()); + + createHBaseSiteXml(hbaseSiteXmlDirectory, TEST_UTIL.getConfiguration().get("hbase.zookeeper.quorum")); + addDirectoryToClassPath(hbaseSiteXmlDirectory); + + // Avoid starting it again. + alreadyRegisteredTestCluster = true; + } + + private static void createHBaseSiteXml(File hbaseSiteXmlDirectory, String zookeeperQuorum) { + hbaseSiteXmlFile = new File(hbaseSiteXmlDirectory, "hbase-site.xml"); + // Create the hbase-site.xml file for this run. + try { + String hbaseSiteXml = "<?xml version=\"1.0\"?>\n" + + "<?xml-stylesheet type=\"text/xsl\" href=\"configuration.xsl\"?>\n" + + "<configuration>\n" + + " <property>\n" + + " <name>hbase.zookeeper.quorum</name>\n" + + " <value>" + zookeeperQuorum + "</value>\n" + + " </property>\n" + + "</configuration>"; + OutputStream fos = new FileOutputStream(hbaseSiteXmlFile); + fos.write(hbaseSiteXml.getBytes(StandardCharsets.UTF_8)); + fos.close(); + } catch (IOException e) { + fail("Unable to create " + hbaseSiteXmlFile); + } + } + + private static void addDirectoryToClassPath(File directory) { + try { + // Get the classloader actually used by HBaseConfiguration + ClassLoader classLoader = HBaseConfiguration.create().getClassLoader(); + if (!(classLoader instanceof URLClassLoader)) { + fail("We should get a URLClassLoader"); + } + + // Make the addURL method accessible + Method method = URLClassLoader.class.getDeclaredMethod("addURL", URL.class); + method.setAccessible(true); + + // Add the directory where we put the hbase-site.xml to the classpath + method.invoke(classLoader, directory.toURI().toURL()); + } catch (MalformedURLException | NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { + fail("Unable to add " + directory + " to classpath because of this exception: " + e.getMessage()); + } + } + + @AfterClass + public static void tearDown() throws Exception { + LOG.info("HBase minicluster: Shutting down"); + deleteTables(); + hbaseSiteXmlFile.delete(); + hbaseSiteXmlDirectory.delete(); + TEST_UTIL.shutdownMiniCluster(); + LOG.info("HBase minicluster: Down"); + } + +}
