http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java deleted file mode 100644 index c32f5da..0000000 --- a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import org.apache.hadoop.io.Writable; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -public class StringArrayWritable implements Writable, Comparable<StringArrayWritable> { - - private String[] array = new String[0]; - - public StringArrayWritable() { - super(); - } - - public StringArrayWritable(String[] array) { - this.array = array; - } - - @Override - public void write(DataOutput out) throws IOException { - out.writeInt(this.array.length); - - for(String str : this.array) { - byte[] b = str.getBytes(); - out.writeInt(b.length); - out.write(b); - } - } - - @Override - public void readFields(DataInput in) throws IOException { - this.array = new String[in.readInt()]; - - for(int i = 0; i < this.array.length; i++) { - byte[] b = new byte[in.readInt()]; - in.readFully(b); - this.array[i] = new String(b); - } - } - - @Override - public int compareTo(StringArrayWritable o) { - if(this.array.length != o.array.length) { - return this.array.length - o.array.length; - } - - for(int i = 0; i < this.array.length; i++) { - int comp = this.array[i].compareTo(o.array[i]); - if(comp != 0) { - return comp; - } - } - return 0; - } - - @Override - public boolean equals(Object obj) { - if(!(obj instanceof StringArrayWritable)) { - return false; - } - return this.compareTo((StringArrayWritable) obj) == 0; - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java deleted file mode 100644 index 96f844c..0000000 --- a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import org.apache.flink.api.common.typeutils.ComparatorTestBase; -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypeSerializer; - -public class WritableComparatorTest extends ComparatorTestBase<StringArrayWritable> { - - StringArrayWritable[] data = new StringArrayWritable[]{ - new StringArrayWritable(new String[]{}), - new StringArrayWritable(new String[]{""}), - new StringArrayWritable(new String[]{"a","a"}), - new StringArrayWritable(new String[]{"a","b"}), - new StringArrayWritable(new String[]{"c","c"}), - new StringArrayWritable(new String[]{"d","f"}), - new StringArrayWritable(new String[]{"d","m"}), - new StringArrayWritable(new String[]{"z","x"}), - new StringArrayWritable(new String[]{"a","a", "a"}) - }; - - @Override - protected TypeComparator<StringArrayWritable> createComparator(boolean ascending) { - return new WritableComparator<StringArrayWritable>(ascending, StringArrayWritable.class); - } - - @Override - protected TypeSerializer<StringArrayWritable> createSerializer() { - return new WritableSerializer<StringArrayWritable>(StringArrayWritable.class); - } - - @Override - protected StringArrayWritable[] getSortedTestData() { - return data; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorUUIDTest.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorUUIDTest.java b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorUUIDTest.java deleted file mode 100644 index 94e759d..0000000 --- a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorUUIDTest.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import org.apache.flink.api.common.typeutils.ComparatorTestBase; -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypeSerializer; - -import java.util.UUID; - -public class WritableComparatorUUIDTest extends ComparatorTestBase<WritableID> { - @Override - protected TypeComparator<WritableID> createComparator(boolean ascending) { - return new WritableComparator<>(ascending, WritableID.class); - } - - @Override - protected TypeSerializer<WritableID> createSerializer() { - return new WritableSerializer<>(WritableID.class); - } - - @Override - protected WritableID[] getSortedTestData() { - return new WritableID[] { - new WritableID(new UUID(0, 0)), - new WritableID(new UUID(1, 0)), - new WritableID(new UUID(1, 1)) - }; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableID.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableID.java b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableID.java deleted file mode 100644 index 4274cf6..0000000 --- a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableID.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import org.apache.hadoop.io.WritableComparable; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.UUID; - -public class WritableID implements WritableComparable<WritableID> { - private UUID uuid; - - public WritableID() { - this.uuid = UUID.randomUUID(); - } - - public WritableID(UUID uuid) { - this.uuid = uuid; - } - - @Override - public int compareTo(WritableID o) { - return this.uuid.compareTo(o.uuid); - } - - @Override - public void write(DataOutput dataOutput) throws IOException { - dataOutput.writeLong(uuid.getMostSignificantBits()); - dataOutput.writeLong(uuid.getLeastSignificantBits()); - } - - @Override - public void readFields(DataInput dataInput) throws IOException { - this.uuid = new UUID(dataInput.readLong(), dataInput.readLong()); - } - - @Override - public String toString() { - return uuid.toString(); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - WritableID id = (WritableID) o; - - return !(uuid != null ? !uuid.equals(id.uuid) : id.uuid != null); - } - - @Override - public int hashCode() { - return uuid != null ? uuid.hashCode() : 0; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java deleted file mode 100644 index bb5f4d4..0000000 --- a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeutils.SerializerTestInstance; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.api.java.typeutils.WritableTypeInfo; -import org.junit.Test; - -public class WritableSerializerTest { - - @Test - public void testStringArrayWritable() { - StringArrayWritable[] data = new StringArrayWritable[]{ - new StringArrayWritable(new String[]{}), - new StringArrayWritable(new String[]{""}), - new StringArrayWritable(new String[]{"a","a"}), - new StringArrayWritable(new String[]{"a","b"}), - new StringArrayWritable(new String[]{"c","c"}), - new StringArrayWritable(new String[]{"d","f"}), - new StringArrayWritable(new String[]{"d","m"}), - new StringArrayWritable(new String[]{"z","x"}), - new StringArrayWritable(new String[]{"a","a", "a"}) - }; - - WritableTypeInfo<StringArrayWritable> writableTypeInfo = (WritableTypeInfo<StringArrayWritable>) TypeExtractor.getForObject(data[0]); - WritableSerializer<StringArrayWritable> writableSerializer = (WritableSerializer<StringArrayWritable>) writableTypeInfo.createSerializer(new ExecutionConfig()); - - SerializerTestInstance<StringArrayWritable> testInstance = new SerializerTestInstance<StringArrayWritable>(writableSerializer,writableTypeInfo.getTypeClass(), -1, data); - - testInstance.testAll(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUUIDTest.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUUIDTest.java b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUUIDTest.java deleted file mode 100644 index 2af7730..0000000 --- a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUUIDTest.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import org.apache.flink.api.common.typeutils.SerializerTestBase; -import org.apache.flink.api.common.typeutils.TypeSerializer; - -import java.util.UUID; - -public class WritableSerializerUUIDTest extends SerializerTestBase<WritableID> { - @Override - protected TypeSerializer<WritableID> createSerializer() { - return new WritableSerializer<>(WritableID.class); - } - - @Override - protected int getLength() { - return -1; - } - - @Override - protected Class<WritableID> getTypeClass() { - return WritableID.class; - } - - @Override - protected WritableID[] getTestData() { - return new WritableID[] { - new WritableID(new UUID(0, 0)), - new WritableID(new UUID(1, 0)), - new WritableID(new UUID(1, 1)) - }; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/hadoopcompatibility/HadoopUtilsTest.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/hadoopcompatibility/HadoopUtilsTest.java b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/hadoopcompatibility/HadoopUtilsTest.java deleted file mode 100644 index 6f7673b..0000000 --- a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/hadoopcompatibility/HadoopUtilsTest.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.hadoopcompatibility; - -import org.apache.flink.api.java.utils.AbstractParameterToolTest; -import org.apache.flink.api.java.utils.ParameterTool; -import org.junit.Test; - -import java.io.IOException; - -public class HadoopUtilsTest extends AbstractParameterToolTest { - - @Test - public void testParamsFromGenericOptionsParser() throws IOException { - ParameterTool parameter = HadoopUtils.paramsFromGenericOptionsParser(new String[]{"-D", "input=myInput", "-DexpectedCount=15"}); - validate(parameter); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapFunctionITCase.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapFunctionITCase.java b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapFunctionITCase.java deleted file mode 100644 index 4d1acb4..0000000 --- a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapFunctionITCase.java +++ /dev/null @@ -1,182 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.hadoopcompatibility.mapred; - -import java.io.IOException; - -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.core.fs.FileSystem; -import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Mapper; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reporter; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) -public class HadoopMapFunctionITCase extends MultipleProgramsTestBase { - - public HadoopMapFunctionITCase(TestExecutionMode mode){ - super(mode); - } - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Test - public void testNonPassingMapper() throws Exception{ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env); - DataSet<Tuple2<IntWritable, Text>> nonPassingFlatMapDs = ds. - flatMap(new HadoopMapFunction<IntWritable, Text, IntWritable, Text>(new NonPassingMapper())); - - String resultPath = tempFolder.newFile().toURI().toString(); - - nonPassingFlatMapDs.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); - env.execute(); - - compareResultsByLinesInMemory("\n", resultPath); - } - - @Test - public void testDataDuplicatingMapper() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env); - DataSet<Tuple2<IntWritable, Text>> duplicatingFlatMapDs = ds. - flatMap(new HadoopMapFunction<IntWritable, Text, IntWritable, Text>(new DuplicatingMapper())); - - String resultPath = tempFolder.newFile().toURI().toString(); - - duplicatingFlatMapDs.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); - env.execute(); - - String expected = "(1,Hi)\n" + "(1,HI)\n" + - "(2,Hello)\n" + "(2,HELLO)\n" + - "(3,Hello world)\n" + "(3,HELLO WORLD)\n" + - "(4,Hello world, how are you?)\n" + "(4,HELLO WORLD, HOW ARE YOU?)\n" + - "(5,I am fine.)\n" + "(5,I AM FINE.)\n" + - "(6,Luke Skywalker)\n" + "(6,LUKE SKYWALKER)\n" + - "(7,Comment#1)\n" + "(7,COMMENT#1)\n" + - "(8,Comment#2)\n" + "(8,COMMENT#2)\n" + - "(9,Comment#3)\n" + "(9,COMMENT#3)\n" + - "(10,Comment#4)\n" + "(10,COMMENT#4)\n" + - "(11,Comment#5)\n" + "(11,COMMENT#5)\n" + - "(12,Comment#6)\n" + "(12,COMMENT#6)\n" + - "(13,Comment#7)\n" + "(13,COMMENT#7)\n" + - "(14,Comment#8)\n" + "(14,COMMENT#8)\n" + - "(15,Comment#9)\n" + "(15,COMMENT#9)\n" + - "(16,Comment#10)\n" + "(16,COMMENT#10)\n" + - "(17,Comment#11)\n" + "(17,COMMENT#11)\n" + - "(18,Comment#12)\n" + "(18,COMMENT#12)\n" + - "(19,Comment#13)\n" + "(19,COMMENT#13)\n" + - "(20,Comment#14)\n" + "(20,COMMENT#14)\n" + - "(21,Comment#15)\n" + "(21,COMMENT#15)\n"; - - compareResultsByLinesInMemory(expected, resultPath); - } - - @Test - public void testConfigurableMapper() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - JobConf conf = new JobConf(); - conf.set("my.filterPrefix", "Hello"); - - DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env); - DataSet<Tuple2<IntWritable, Text>> hellos = ds. - flatMap(new HadoopMapFunction<IntWritable, Text, IntWritable, Text>(new ConfigurableMapper(), conf)); - - String resultPath = tempFolder.newFile().toURI().toString(); - - hellos.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); - env.execute(); - - String expected = "(2,Hello)\n" + - "(3,Hello world)\n" + - "(4,Hello world, how are you?)\n"; - - compareResultsByLinesInMemory(expected, resultPath); - } - - - - public static class NonPassingMapper implements Mapper<IntWritable, Text, IntWritable, Text> { - - @Override - public void map(final IntWritable k, final Text v, - final OutputCollector<IntWritable, Text> out, final Reporter r) throws IOException { - if ( v.toString().contains("bananas") ) { - out.collect(k,v); - } - } - - @Override - public void configure(final JobConf arg0) { } - - @Override - public void close() throws IOException { } - } - - public static class DuplicatingMapper implements Mapper<IntWritable, Text, IntWritable, Text> { - - @Override - public void map(final IntWritable k, final Text v, - final OutputCollector<IntWritable, Text> out, final Reporter r) throws IOException { - out.collect(k, v); - out.collect(k, new Text(v.toString().toUpperCase())); - } - - @Override - public void configure(final JobConf arg0) { } - - @Override - public void close() throws IOException { } - } - - public static class ConfigurableMapper implements Mapper<IntWritable, Text, IntWritable, Text> { - private String filterPrefix; - - @Override - public void map(IntWritable k, Text v, OutputCollector<IntWritable, Text> out, Reporter r) - throws IOException { - if(v.toString().startsWith(filterPrefix)) { - out.collect(k, v); - } - } - - @Override - public void configure(JobConf c) { - filterPrefix = c.get("my.filterPrefix"); - } - - @Override - public void close() throws IOException { } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java deleted file mode 100644 index ccc0d82..0000000 --- a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.hadoopcompatibility.mapred; - -import org.apache.flink.test.hadoopcompatibility.mapred.example.HadoopMapredCompatWordCount; -import org.apache.flink.test.testdata.WordCountData; -import org.apache.flink.test.util.JavaProgramTestBase; - -public class HadoopMapredITCase extends JavaProgramTestBase { - - protected String textPath; - protected String resultPath; - - @Override - protected void preSubmit() throws Exception { - textPath = createTempFile("text.txt", WordCountData.TEXT); - resultPath = getTempDirPath("result"); - this.setParallelism(4); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath, new String[]{".", "_"}); - } - - @Override - protected void testProgram() throws Exception { - HadoopMapredCompatWordCount.main(new String[] { textPath, resultPath }); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceCombineFunctionITCase.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceCombineFunctionITCase.java b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceCombineFunctionITCase.java deleted file mode 100644 index 13d971c..0000000 --- a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceCombineFunctionITCase.java +++ /dev/null @@ -1,265 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.hadoopcompatibility.mapred; - -import java.io.IOException; -import java.util.Iterator; - -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction; -import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reducer; -import org.apache.hadoop.mapred.Reporter; -import org.hamcrest.core.IsEqual; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) -public class HadoopReduceCombineFunctionITCase extends MultipleProgramsTestBase { - - public HadoopReduceCombineFunctionITCase(TestExecutionMode mode){ - super(mode); - } - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Test - public void testStandardCountingWithCombiner() throws Exception{ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple2<IntWritable, IntWritable>> ds = HadoopTestData.getKVPairDataSet(env). - map(new Mapper1()); - - DataSet<Tuple2<IntWritable, IntWritable>> counts = ds. - groupBy(0). - reduceGroup(new HadoopReduceCombineFunction<IntWritable, IntWritable, IntWritable, IntWritable>( - new SumReducer(), new SumReducer())); - - String resultPath = tempFolder.newFile().toURI().toString(); - - counts.writeAsText(resultPath); - env.execute(); - - String expected = "(0,5)\n"+ - "(1,6)\n" + - "(2,6)\n" + - "(3,4)\n"; - - compareResultsByLinesInMemory(expected, resultPath); - } - - @Test - public void testUngroupedHadoopReducer() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple2<IntWritable, IntWritable>> ds = HadoopTestData.getKVPairDataSet(env). - map(new Mapper2()); - - DataSet<Tuple2<IntWritable, IntWritable>> sum = ds. - reduceGroup(new HadoopReduceCombineFunction<IntWritable, IntWritable, IntWritable, IntWritable>( - new SumReducer(), new SumReducer())); - - String resultPath = tempFolder.newFile().toURI().toString(); - - sum.writeAsText(resultPath); - env.execute(); - - String expected = "(0,231)\n"; - - compareResultsByLinesInMemory(expected, resultPath); - } - - @Test - public void testCombiner() throws Exception { - org.junit.Assume.assumeThat(mode, new IsEqual<TestExecutionMode>(TestExecutionMode.CLUSTER)); - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple2<IntWritable, IntWritable>> ds = HadoopTestData.getKVPairDataSet(env). - map(new Mapper3()); - - DataSet<Tuple2<IntWritable, IntWritable>> counts = ds. - groupBy(0). - reduceGroup(new HadoopReduceCombineFunction<IntWritable, IntWritable, IntWritable, IntWritable>( - new SumReducer(), new KeyChangingReducer())); - - String resultPath = tempFolder.newFile().toURI().toString(); - - counts.writeAsText(resultPath); - env.execute(); - - String expected = "(0,5)\n"+ - "(1,6)\n" + - "(2,5)\n" + - "(3,5)\n"; - - compareResultsByLinesInMemory(expected, resultPath); - } - - @Test - public void testConfigurationViaJobConf() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - JobConf conf = new JobConf(); - conf.set("my.cntPrefix", "Hello"); - - DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env). - map(new Mapper4()); - - DataSet<Tuple2<IntWritable, IntWritable>> hellos = ds. - groupBy(0). - reduceGroup(new HadoopReduceFunction<IntWritable, Text, IntWritable, IntWritable>( - new ConfigurableCntReducer(), conf)); - - String resultPath = tempFolder.newFile().toURI().toString(); - - hellos.writeAsText(resultPath); - env.execute(); - - // return expected result - String expected = "(0,0)\n"+ - "(1,0)\n" + - "(2,1)\n" + - "(3,1)\n" + - "(4,1)\n"; - - compareResultsByLinesInMemory(expected, resultPath); - } - - public static class SumReducer implements Reducer<IntWritable, IntWritable, IntWritable, IntWritable> { - - @Override - public void reduce(IntWritable k, Iterator<IntWritable> v, OutputCollector<IntWritable, IntWritable> out, Reporter r) - throws IOException { - - int sum = 0; - while(v.hasNext()) { - sum += v.next().get(); - } - out.collect(k, new IntWritable(sum)); - } - - @Override - public void configure(JobConf arg0) { } - - @Override - public void close() throws IOException { } - } - - public static class KeyChangingReducer implements Reducer<IntWritable, IntWritable, IntWritable, IntWritable> { - - @Override - public void reduce(IntWritable k, Iterator<IntWritable> v, OutputCollector<IntWritable, IntWritable> out, Reporter r) - throws IOException { - while(v.hasNext()) { - out.collect(new IntWritable(k.get() % 4), v.next()); - } - } - - @Override - public void configure(JobConf arg0) { } - - @Override - public void close() throws IOException { } - } - - public static class ConfigurableCntReducer implements Reducer<IntWritable, Text, IntWritable, IntWritable> { - private String countPrefix; - - @Override - public void reduce(IntWritable k, Iterator<Text> vs, OutputCollector<IntWritable, IntWritable> out, Reporter r) - throws IOException { - int commentCnt = 0; - while(vs.hasNext()) { - String v = vs.next().toString(); - if(v.startsWith(this.countPrefix)) { - commentCnt++; - } - } - out.collect(k, new IntWritable(commentCnt)); - } - - @Override - public void configure(final JobConf c) { - this.countPrefix = c.get("my.cntPrefix"); - } - - @Override - public void close() throws IOException { } - } - - public static class Mapper1 implements MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable, - IntWritable>> { - private static final long serialVersionUID = 1L; - Tuple2<IntWritable,IntWritable> outT = new Tuple2<IntWritable,IntWritable>(); - @Override - public Tuple2<IntWritable, IntWritable> map(Tuple2<IntWritable, Text> v) - throws Exception { - outT.f0 = new IntWritable(v.f0.get() / 6); - outT.f1 = new IntWritable(1); - return outT; - } - } - - public static class Mapper2 implements MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable, - IntWritable>> { - private static final long serialVersionUID = 1L; - Tuple2<IntWritable,IntWritable> outT = new Tuple2<IntWritable,IntWritable>(); - @Override - public Tuple2<IntWritable, IntWritable> map(Tuple2<IntWritable, Text> v) - throws Exception { - outT.f0 = new IntWritable(0); - outT.f1 = v.f0; - return outT; - } - } - - public static class Mapper3 implements MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable, IntWritable>> { - private static final long serialVersionUID = 1L; - Tuple2<IntWritable,IntWritable> outT = new Tuple2<IntWritable,IntWritable>(); - @Override - public Tuple2<IntWritable, IntWritable> map(Tuple2<IntWritable, Text> v) - throws Exception { - outT.f0 = v.f0; - outT.f1 = new IntWritable(1); - return outT; - } - } - - public static class Mapper4 implements MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable, Text>> { - private static final long serialVersionUID = 1L; - @Override - public Tuple2<IntWritable, Text> map(Tuple2<IntWritable, Text> v) - throws Exception { - v.f0 = new IntWritable(v.f0.get() % 5); - return v; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceFunctionITCase.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceFunctionITCase.java b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceFunctionITCase.java deleted file mode 100644 index abc0e9c..0000000 --- a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceFunctionITCase.java +++ /dev/null @@ -1,213 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.hadoopcompatibility.mapred; - -import java.io.IOException; -import java.util.Iterator; - -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reducer; -import org.apache.hadoop.mapred.Reporter; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) -public class HadoopReduceFunctionITCase extends MultipleProgramsTestBase { - - public HadoopReduceFunctionITCase(TestExecutionMode mode){ - super(mode); - } - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Test - public void testStandardGrouping() throws Exception{ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env). - map(new Mapper1()); - - DataSet<Tuple2<IntWritable, IntWritable>> commentCnts = ds. - groupBy(0). - reduceGroup(new HadoopReduceFunction<IntWritable, Text, IntWritable, IntWritable>(new CommentCntReducer())); - - String resultPath = tempFolder.newFile().toURI().toString(); - - commentCnts.writeAsText(resultPath); - env.execute(); - - String expected = "(0,0)\n"+ - "(1,3)\n" + - "(2,5)\n" + - "(3,5)\n" + - "(4,2)\n"; - - compareResultsByLinesInMemory(expected, resultPath); - } - - @Test - public void testUngroupedHadoopReducer() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env); - - DataSet<Tuple2<IntWritable, IntWritable>> commentCnts = ds. - reduceGroup(new HadoopReduceFunction<IntWritable, Text, IntWritable, IntWritable>(new AllCommentCntReducer())); - - String resultPath = tempFolder.newFile().toURI().toString(); - - commentCnts.writeAsText(resultPath); - env.execute(); - - String expected = "(42,15)\n"; - - compareResultsByLinesInMemory(expected, resultPath); - } - - @Test - public void testConfigurationViaJobConf() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - JobConf conf = new JobConf(); - conf.set("my.cntPrefix", "Hello"); - - DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env). - map(new Mapper2()); - - DataSet<Tuple2<IntWritable, IntWritable>> helloCnts = ds. - groupBy(0). - reduceGroup(new HadoopReduceFunction<IntWritable, Text, IntWritable, IntWritable>( - new ConfigurableCntReducer(), conf)); - - String resultPath = tempFolder.newFile().toURI().toString(); - - helloCnts.writeAsText(resultPath); - env.execute(); - - String expected = "(0,0)\n"+ - "(1,0)\n" + - "(2,1)\n" + - "(3,1)\n" + - "(4,1)\n"; - - compareResultsByLinesInMemory(expected, resultPath); - } - - public static class CommentCntReducer implements Reducer<IntWritable, Text, IntWritable, IntWritable> { - - @Override - public void reduce(IntWritable k, Iterator<Text> vs, OutputCollector<IntWritable, IntWritable> out, Reporter r) - throws IOException { - int commentCnt = 0; - while(vs.hasNext()) { - String v = vs.next().toString(); - if(v.startsWith("Comment")) { - commentCnt++; - } - } - out.collect(k, new IntWritable(commentCnt)); - } - - @Override - public void configure(final JobConf arg0) { } - - @Override - public void close() throws IOException { } - } - - public static class AllCommentCntReducer implements Reducer<IntWritable, Text, IntWritable, IntWritable> { - - @Override - public void reduce(IntWritable k, Iterator<Text> vs, OutputCollector<IntWritable, IntWritable> out, Reporter r) - throws IOException { - int commentCnt = 0; - while(vs.hasNext()) { - String v = vs.next().toString(); - if(v.startsWith("Comment")) { - commentCnt++; - } - } - out.collect(new IntWritable(42), new IntWritable(commentCnt)); - } - - @Override - public void configure(final JobConf arg0) { } - - @Override - public void close() throws IOException { } - } - - public static class ConfigurableCntReducer implements Reducer<IntWritable, Text, IntWritable, IntWritable> { - private String countPrefix; - - @Override - public void reduce(IntWritable k, Iterator<Text> vs, OutputCollector<IntWritable, IntWritable> out, Reporter r) - throws IOException { - int commentCnt = 0; - while(vs.hasNext()) { - String v = vs.next().toString(); - if(v.startsWith(this.countPrefix)) { - commentCnt++; - } - } - out.collect(k, new IntWritable(commentCnt)); - } - - @Override - public void configure(final JobConf c) { - this.countPrefix = c.get("my.cntPrefix"); - } - - @Override - public void close() throws IOException { } - } - - public static class Mapper1 implements MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable, Text>> { - private static final long serialVersionUID = 1L; - @Override - public Tuple2<IntWritable, Text> map(Tuple2<IntWritable, Text> v) - throws Exception { - v.f0 = new IntWritable(v.f0.get() / 5); - return v; - } - } - - public static class Mapper2 implements MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable, Text>> { - private static final long serialVersionUID = 1L; - @Override - public Tuple2<IntWritable, Text> map(Tuple2<IntWritable, Text> v) - throws Exception { - v.f0 = new IntWritable(v.f0.get() % 5); - return v; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopTestData.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopTestData.java b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopTestData.java deleted file mode 100644 index eed6f8f..0000000 --- a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopTestData.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.hadoopcompatibility.mapred; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.Text; - -public class HadoopTestData { - - public static DataSet<Tuple2<IntWritable, Text>> getKVPairDataSet(ExecutionEnvironment env) { - - List<Tuple2<IntWritable, Text>> data = new ArrayList<Tuple2<IntWritable, Text>>(); - data.add(new Tuple2<IntWritable, Text>(new IntWritable(1),new Text("Hi"))); - data.add(new Tuple2<IntWritable, Text>(new IntWritable(2),new Text("Hello"))); - data.add(new Tuple2<IntWritable, Text>(new IntWritable(3),new Text("Hello world"))); - data.add(new Tuple2<IntWritable, Text>(new IntWritable(4),new Text("Hello world, how are you?"))); - data.add(new Tuple2<IntWritable, Text>(new IntWritable(5),new Text("I am fine."))); - data.add(new Tuple2<IntWritable, Text>(new IntWritable(6),new Text("Luke Skywalker"))); - data.add(new Tuple2<IntWritable, Text>(new IntWritable(7),new Text("Comment#1"))); - data.add(new Tuple2<IntWritable, Text>(new IntWritable(8),new Text("Comment#2"))); - data.add(new Tuple2<IntWritable, Text>(new IntWritable(9),new Text("Comment#3"))); - data.add(new Tuple2<IntWritable, Text>(new IntWritable(10),new Text("Comment#4"))); - data.add(new Tuple2<IntWritable, Text>(new IntWritable(11),new Text("Comment#5"))); - data.add(new Tuple2<IntWritable, Text>(new IntWritable(12),new Text("Comment#6"))); - data.add(new Tuple2<IntWritable, Text>(new IntWritable(13),new Text("Comment#7"))); - data.add(new Tuple2<IntWritable, Text>(new IntWritable(14),new Text("Comment#8"))); - data.add(new Tuple2<IntWritable, Text>(new IntWritable(15),new Text("Comment#9"))); - data.add(new Tuple2<IntWritable, Text>(new IntWritable(16),new Text("Comment#10"))); - data.add(new Tuple2<IntWritable, Text>(new IntWritable(17),new Text("Comment#11"))); - data.add(new Tuple2<IntWritable, Text>(new IntWritable(18),new Text("Comment#12"))); - data.add(new Tuple2<IntWritable, Text>(new IntWritable(19),new Text("Comment#13"))); - data.add(new Tuple2<IntWritable, Text>(new IntWritable(20),new Text("Comment#14"))); - data.add(new Tuple2<IntWritable, Text>(new IntWritable(21),new Text("Comment#15"))); - - Collections.shuffle(data); - - return env.fromCollection(data); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java deleted file mode 100644 index ce0143a..0000000 --- a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.hadoopcompatibility.mapred.example; - -import java.io.IOException; -import java.util.Iterator; - -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat; -import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction; -import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat; -import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Mapper; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reducer; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapred.TextInputFormat; -import org.apache.hadoop.mapred.TextOutputFormat; - - - -/** - * Implements a word count which takes the input file and counts the number of - * occurrences of each word in the file and writes the result back to disk. - * - * This example shows how to use Hadoop Input Formats, how to convert Hadoop Writables to - * common Java types for better usage in a Flink job and how to use Hadoop Output Formats. - */ -public class HadoopMapredCompatWordCount { - - public static void main(String[] args) throws Exception { - if (args.length < 2) { - System.err.println("Usage: WordCount <input path> <result path>"); - return; - } - - final String inputPath = args[0]; - final String outputPath = args[1]; - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - // Set up the Hadoop Input Format - HadoopInputFormat<LongWritable, Text> hadoopInputFormat = new HadoopInputFormat<LongWritable, Text>(new TextInputFormat(), LongWritable.class, Text.class, new JobConf()); - TextInputFormat.addInputPath(hadoopInputFormat.getJobConf(), new Path(inputPath)); - - // Create a Flink job with it - DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopInputFormat); - - DataSet<Tuple2<Text, LongWritable>> words = - text.flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(new Tokenizer())) - .groupBy(0).reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(new Counter(), new Counter())); - - // Set up Hadoop Output Format - HadoopOutputFormat<Text, LongWritable> hadoopOutputFormat = - new HadoopOutputFormat<Text, LongWritable>(new TextOutputFormat<Text, LongWritable>(), new JobConf()); - hadoopOutputFormat.getJobConf().set("mapred.textoutputformat.separator", " "); - TextOutputFormat.setOutputPath(hadoopOutputFormat.getJobConf(), new Path(outputPath)); - - // Output & Execute - words.output(hadoopOutputFormat).setParallelism(1); - env.execute("Hadoop Compat WordCount"); - } - - - public static final class Tokenizer implements Mapper<LongWritable, Text, Text, LongWritable> { - - @Override - public void map(LongWritable k, Text v, OutputCollector<Text, LongWritable> out, Reporter rep) - throws IOException { - // normalize and split the line - String line = v.toString(); - String[] tokens = line.toLowerCase().split("\\W+"); - - // emit the pairs - for (String token : tokens) { - if (token.length() > 0) { - out.collect(new Text(token), new LongWritable(1l)); - } - } - } - - @Override - public void configure(JobConf arg0) { } - - @Override - public void close() throws IOException { } - - } - - public static final class Counter implements Reducer<Text, LongWritable, Text, LongWritable> { - - @Override - public void reduce(Text k, Iterator<LongWritable> vs, OutputCollector<Text, LongWritable> out, Reporter rep) - throws IOException { - - long cnt = 0; - while(vs.hasNext()) { - cnt += vs.next().get(); - } - out.collect(k, new LongWritable(cnt)); - - } - - @Override - public void configure(JobConf arg0) { } - - @Override - public void close() throws IOException { } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java deleted file mode 100644 index 524318c..0000000 --- a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.hadoopcompatibility.mapred.wrapper; - -import java.util.ArrayList; -import java.util.NoSuchElementException; - -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.runtime.WritableSerializer; -import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator; -import org.apache.hadoop.io.IntWritable; -import org.junit.Assert; -import org.junit.Test; - -public class HadoopTupleUnwrappingIteratorTest { - - @Test - public void testValueIterator() { - - HadoopTupleUnwrappingIterator<IntWritable, IntWritable> valIt = - new HadoopTupleUnwrappingIterator<IntWritable, IntWritable>(new WritableSerializer - <IntWritable>(IntWritable.class)); - - // many values - - ArrayList<Tuple2<IntWritable, IntWritable>> tList = new ArrayList<Tuple2<IntWritable, IntWritable>>(); - tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(1))); - tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(2))); - tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(3))); - tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(4))); - tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(5))); - tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(6))); - tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(7))); - tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(8))); - - int expectedKey = 1; - int[] expectedValues = new int[] {1,2,3,4,5,6,7,8}; - - valIt.set(tList.iterator()); - Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey); - for(int expectedValue : expectedValues) { - Assert.assertTrue(valIt.hasNext()); - Assert.assertTrue(valIt.hasNext()); - Assert.assertTrue(valIt.next().get() == expectedValue); - Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey); - } - Assert.assertFalse(valIt.hasNext()); - Assert.assertFalse(valIt.hasNext()); - Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey); - - // one value - - tList.clear(); - tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(2),new IntWritable(10))); - - expectedKey = 2; - expectedValues = new int[]{10}; - - valIt.set(tList.iterator()); - Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey); - for(int expectedValue : expectedValues) { - Assert.assertTrue(valIt.hasNext()); - Assert.assertTrue(valIt.hasNext()); - Assert.assertTrue(valIt.next().get() == expectedValue); - Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey); - } - Assert.assertFalse(valIt.hasNext()); - Assert.assertFalse(valIt.hasNext()); - Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey); - - // more values - - tList.clear(); - tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(3),new IntWritable(10))); - tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(3),new IntWritable(4))); - tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(3),new IntWritable(7))); - tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(3),new IntWritable(9))); - tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4),new IntWritable(21))); - - expectedKey = 3; - expectedValues = new int[]{10,4,7,9,21}; - - valIt.set(tList.iterator()); - Assert.assertTrue(valIt.hasNext()); - Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey); - for(int expectedValue : expectedValues) { - Assert.assertTrue(valIt.hasNext()); - Assert.assertTrue(valIt.hasNext()); - Assert.assertTrue(valIt.next().get() == expectedValue); - Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey); - } - Assert.assertFalse(valIt.hasNext()); - Assert.assertFalse(valIt.hasNext()); - Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey); - - // no has next calls - - tList.clear(); - tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4),new IntWritable(5))); - tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4),new IntWritable(8))); - tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4),new IntWritable(42))); - tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4),new IntWritable(-1))); - tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4),new IntWritable(0))); - - expectedKey = 4; - expectedValues = new int[]{5,8,42,-1,0}; - - valIt.set(tList.iterator()); - Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey); - for(int expectedValue : expectedValues) { - Assert.assertTrue(valIt.next().get() == expectedValue); - } - try { - valIt.next(); - Assert.fail(); - } catch (NoSuchElementException nsee) { - // expected - } - Assert.assertFalse(valIt.hasNext()); - Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java deleted file mode 100644 index 698e356..0000000 --- a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.hadoopcompatibility.mapreduce; - -import org.apache.flink.test.hadoopcompatibility.mapreduce.example.WordCount; -import org.apache.flink.test.testdata.WordCountData; -import org.apache.flink.test.util.JavaProgramTestBase; - -public class HadoopInputOutputITCase extends JavaProgramTestBase { - - protected String textPath; - protected String resultPath; - - - @Override - protected void preSubmit() throws Exception { - textPath = createTempFile("text.txt", WordCountData.TEXT); - resultPath = getTempDirPath("result"); - this.setParallelism(4); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath, new String[]{".", "_"}); - } - - @Override - protected void testProgram() throws Exception { - WordCount.main(new String[] { textPath, resultPath }); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/example/WordCount.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/example/WordCount.java b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/example/WordCount.java deleted file mode 100644 index ed83d78..0000000 --- a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/example/WordCount.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.hadoopcompatibility.mapreduce.example; - -import org.apache.flink.api.java.aggregation.Aggregations; -import org.apache.flink.api.common.functions.RichFlatMapFunction; -import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.util.Collector; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; -import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat; -import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat; - -/** - * Implements a word count which takes the input file and counts the number of - * occurrences of each word in the file and writes the result back to disk. - * - * This example shows how to use Hadoop Input Formats, how to convert Hadoop Writables to - * common Java types for better usage in a Flink job and how to use Hadoop Output Formats. - */ -@SuppressWarnings("serial") -public class WordCount { - - public static void main(String[] args) throws Exception { - if (args.length < 2) { - System.err.println("Usage: WordCount <input path> <result path>"); - return; - } - - final String inputPath = args[0]; - final String outputPath = args[1]; - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - // Set up the Hadoop Input Format - Job job = Job.getInstance(); - HadoopInputFormat<LongWritable, Text> hadoopInputFormat = new HadoopInputFormat<LongWritable, Text>(new TextInputFormat(), LongWritable.class, Text.class, job); - TextInputFormat.addInputPath(job, new Path(inputPath)); - - // Create a Flink job with it - DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopInputFormat); - - // Tokenize the line and convert from Writable "Text" to String for better handling - DataSet<Tuple2<String, Integer>> words = text.flatMap(new Tokenizer()); - - // Sum up the words - DataSet<Tuple2<String, Integer>> result = words.groupBy(0).aggregate(Aggregations.SUM, 1); - - // Convert String back to Writable "Text" for use with Hadoop Output Format - DataSet<Tuple2<Text, IntWritable>> hadoopResult = result.map(new HadoopDatatypeMapper()); - - // Set up Hadoop Output Format - HadoopOutputFormat<Text, IntWritable> hadoopOutputFormat = new HadoopOutputFormat<Text, IntWritable>(new TextOutputFormat<Text, IntWritable>(), job); - hadoopOutputFormat.getConfiguration().set("mapreduce.output.textoutputformat.separator", " "); - hadoopOutputFormat.getConfiguration().set("mapred.textoutputformat.separator", " "); // set the value for both, since this test - TextOutputFormat.setOutputPath(job, new Path(outputPath)); - - // Output & Execute - hadoopResult.output(hadoopOutputFormat); - env.execute("Word Count"); - } - - /** - * Splits a line into words and converts Hadoop Writables into normal Java data types. - */ - public static final class Tokenizer extends RichFlatMapFunction<Tuple2<LongWritable, Text>, Tuple2<String, Integer>> { - - @Override - public void flatMap(Tuple2<LongWritable, Text> value, Collector<Tuple2<String, Integer>> out) { - // normalize and split the line - String line = value.f1.toString(); - String[] tokens = line.toLowerCase().split("\\W+"); - - // emit the pairs - for (String token : tokens) { - if (token.length() > 0) { - out.collect(new Tuple2<String, Integer>(token, 1)); - } - } - } - } - - /** - * Converts Java data types to Hadoop Writables. - */ - public static final class HadoopDatatypeMapper extends RichMapFunction<Tuple2<String, Integer>, Tuple2<Text, IntWritable>> { - - @Override - public Tuple2<Text, IntWritable> map(Tuple2<String, Integer> value) throws Exception { - return new Tuple2<Text, IntWritable>(new Text(value.f0), new IntWritable(value.f1)); - } - - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/test/resources/log4j-test.properties b/flink-batch-connectors/flink-hadoop-compatibility/src/test/resources/log4j-test.properties deleted file mode 100644 index 0b686e5..0000000 --- a/flink-batch-connectors/flink-hadoop-compatibility/src/test/resources/log4j-test.properties +++ /dev/null @@ -1,27 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -# Set root logger level to DEBUG and its only appender to A1. -log4j.rootLogger=OFF, A1 - -# A1 is set to be a ConsoleAppender. -log4j.appender.A1=org.apache.log4j.ConsoleAppender - -# A1 uses PatternLayout. -log4j.appender.A1.layout=org.apache.log4j.PatternLayout -log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/test/resources/logback-test.xml b/flink-batch-connectors/flink-hadoop-compatibility/src/test/resources/logback-test.xml deleted file mode 100644 index 8b3bb27..0000000 --- a/flink-batch-connectors/flink-hadoop-compatibility/src/test/resources/logback-test.xml +++ /dev/null @@ -1,29 +0,0 @@ -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one - ~ or more contributor license agreements. See the NOTICE file - ~ distributed with this work for additional information - ~ regarding copyright ownership. The ASF licenses this file - ~ to you under the Apache License, Version 2.0 (the - ~ "License"); you may not use this file except in compliance - ~ with the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<configuration> - <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> - <encoder> - <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern> - </encoder> - </appender> - - <root level="WARN"> - <appender-ref ref="STDOUT"/> - </root> -</configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hbase/pom.xml ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hbase/pom.xml b/flink-batch-connectors/flink-hbase/pom.xml deleted file mode 100644 index 70a5692..0000000 --- a/flink-batch-connectors/flink-hbase/pom.xml +++ /dev/null @@ -1,264 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.flink</groupId> - <artifactId>flink-batch-connectors</artifactId> - <version>1.2-SNAPSHOT</version> - <relativePath>..</relativePath> - </parent> - - <artifactId>flink-hbase_2.10</artifactId> - <name>flink-hbase</name> - <packaging>jar</packaging> - - <properties> - <hbase.version>1.2.3</hbase.version> - </properties> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-surefire-plugin</artifactId> - <version>2.19.1</version> - <configuration> - <!-- Enforce single fork execution due to heavy mini cluster use in the tests --> - <forkCount>1</forkCount> - </configuration> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-shade-plugin</artifactId> - <executions> - <execution> - <!-- Disable inherited shade-flink because of a problem in the shade plugin --> - <!-- When enabled you'll run into an infinite loop creating the dependency-reduced-pom.xml --> - <!-- Seems similar to https://issues.apache.org/jira/browse/MSHADE-148 --> - <id>shade-flink</id> - <phase>none</phase> - </execution> - </executions> - </plugin> - </plugins> - </build> - - <dependencies> - - <!-- core dependencies --> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-core</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-java</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-shaded-hadoop2</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-java_2.10</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - - <!--Exclude Guava in order to run the HBaseMiniCluster during testing--> - <exclusions> - <exclusion> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - </exclusion> - </exclusions> - </dependency> - - <!-- HBase server needed for TableOutputFormat --> - <!-- TODO implement bulk output format for HBase --> - <dependency> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase-server</artifactId> - <version>${hbase.version}</version> - <exclusions> - <!-- Remove unneeded dependency, which is conflicting with our jetty-util version. --> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jetty-util</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jetty</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jetty-sslengine</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jsp-2.1</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jsp-api-2.1</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>servlet-api-2.5</artifactId> - </exclusion> - <!-- The hadoop dependencies are handled through flink-shaded-hadoop --> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-auth</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-annotations</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-mapreduce-client-core</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdfs</artifactId> - </exclusion> - <!-- Bug in hbase annotations, can be removed when fixed. See FLINK-2153. --> - <exclusion> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase-annotations</artifactId> - </exclusion> - </exclusions> - </dependency> - - <!-- test dependencies --> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-clients_2.10</artifactId> - <version>${project.version}</version> - <scope>test</scope> - <exclusions> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-core</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-hadoop-compatibility_2.10</artifactId> - <version>${project.version}</version> - <scope>test</scope> - <exclusions> - <exclusion> - <groupId>org.apache.flink</groupId> - <artifactId>flink-shaded-include-yarn_2.10</artifactId> - </exclusion> - </exclusions> - </dependency> - - <!-- Test dependencies are only available for Hadoop-2. --> - <dependency> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase-server</artifactId> - <version>${hbase.version}</version> - <classifier>tests</classifier> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-minicluster</artifactId> - <version>${hadoop.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase-hadoop-compat</artifactId> - <version>${hbase.version}</version> - <scope>test</scope> - <type>test-jar</type> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdfs</artifactId> - <version>${hadoop.version}</version> - <type>test-jar</type> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase-hadoop2-compat</artifactId> - <version>${hbase.version}</version> - <scope>test</scope> - <type>test-jar</type> - </dependency> - </dependencies> - - <profiles> - <profile> - <id>cdh5.1.3</id> - <properties> - <hbase.version>0.98.1-cdh5.1.3</hbase.version> - <hadoop.version>2.3.0-cdh5.1.3</hadoop.version> - <!-- Cloudera use different versions for hadoop core and commons--> - <!-- This profile could be removed if Cloudera fix this mismatch! --> - <hadoop.core.version>2.3.0-mr1-cdh5.1.3</hadoop.core.version> - </properties> - <dependencyManagement> - <dependencies> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-core</artifactId> - <version>${hadoop.core.version}</version> - </dependency> - </dependencies> - </dependencyManagement> - </profile> - - </profiles> - -</project>
