Repository: flink Updated Branches: refs/heads/master 8f4139a42 -> 586f81813
Revert "[FLINK-2608] Updated Twitter Chill version." This reverts commit 0d3ff88b369fbb1b0a8fb0e8263c9ce0a9da1583. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/586f8181 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/586f8181 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/586f8181 Branch: refs/heads/master Commit: 586f818130d988ed6ca7ac06679451f6d493da32 Parents: 8fddae8 Author: Ufuk Celebi <[email protected]> Authored: Wed Jan 18 11:27:43 2017 +0100 Committer: Ufuk Celebi <[email protected]> Committed: Wed Jan 18 15:27:16 2017 +0100 ---------------------------------------------------------------------- flink-runtime/pom.xml | 13 -- .../util/CollectionDataSets.java | 32 ++-- .../kryo/KryoCollectionsSerializerTest.java | 185 ------------------- pom.xml | 2 +- 4 files changed, 16 insertions(+), 216 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/586f8181/flink-runtime/pom.xml ---------------------------------------------------------------------- diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml index 6e51080..30d83b0 100644 --- a/flink-runtime/pom.xml +++ b/flink-runtime/pom.xml @@ -154,19 +154,6 @@ under the License. <groupId>com.twitter</groupId> <artifactId>chill_${scala.binary.version}</artifactId> <version>${chill.version}</version> - <exclusions> - <!-- Exclude Kryo dependency from Chill --> - <exclusion> - <groupId>com.esotericsoftware</groupId> - <artifactId>kryo-shaded</artifactId> - </exclusion> - </exclusions> - </dependency> - - <!-- Include our own version of Kryo --> - <dependency> - <groupId>com.esotericsoftware.kryo</groupId> - <artifactId>kryo</artifactId> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/flink/blob/586f8181/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java index 389a18f..ba48e12 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java @@ -23,8 +23,6 @@ import java.io.Serializable; import java.math.BigDecimal; import java.math.BigInteger; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.Date; @@ -48,10 +46,10 @@ import scala.math.BigInt; /** * ####################################################################################################### - * - * BE AWARE THAT OTHER TESTS DEPEND ON THIS TEST DATA. + * + * BE AWARE THAT OTHER TESTS DEPEND ON THIS TEST DATA. * IF YOU MODIFY THE DATA MAKE SURE YOU CHECK THAT ALL TESTS ARE STILL WORKING! - * + * * ####################################################################################################### */ public class CollectionDataSets { @@ -203,7 +201,7 @@ public class CollectionDataSets { return env.fromCollection(data, type); } - + public static DataSet<Tuple2<byte[], Integer>> getTuple2WithByteArrayDataSet(ExecutionEnvironment env) { List<Tuple2<byte[], Integer>> data = new ArrayList<>(); data.add(new Tuple2<>(new byte[]{0, 4}, 1)); @@ -212,12 +210,12 @@ public class CollectionDataSets { data.add(new Tuple2<>(new byte[]{2, 1}, 3)); data.add(new Tuple2<>(new byte[]{0}, 0)); data.add(new Tuple2<>(new byte[]{2, 0}, 1)); - + TupleTypeInfo<Tuple2<byte[], Integer>> type = new TupleTypeInfo<>( PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO ); - + return env.fromCollection(data, type); } @@ -349,13 +347,13 @@ public class CollectionDataSets { data.add(new Tuple7<>(3, "Third", 30, 300, 3000L, "Three", 30000L)); return env.fromCollection(data); } - + public static DataSet<Tuple7<Long, Integer, Integer, Long, String, Integer, String>> getSmallTuplebasedDataSetMatchingPojo(ExecutionEnvironment env) { List<Tuple7<Long, Integer, Integer, Long, String, Integer, String>> data = new ArrayList<>(); data.add(new Tuple7<>(10000L, 10, 100, 1000L, "One", 1, "First")); data.add(new Tuple7<>(20000L, 20, 200, 2000L, "Two", 2, "Second")); data.add(new Tuple7<>(30000L, 30, 300, 3000L, "Three", 3, "Third")); - + return env.fromCollection(data); } @@ -612,22 +610,22 @@ public class CollectionDataSets { public Date date; public Category cat; } - + public static DataSet<PojoWithDateAndEnum> getPojoWithDateAndEnum(ExecutionEnvironment env) { List<PojoWithDateAndEnum> data = new ArrayList<>(); - + PojoWithDateAndEnum one = new PojoWithDateAndEnum(); one.group = "a"; one.date = new Date(666); one.cat = Category.CAT_A; data.add(one); - + PojoWithDateAndEnum two = new PojoWithDateAndEnum(); two.group = "a"; two.date = new Date(666); two.cat = Category.CAT_A; data.add(two); - + PojoWithDateAndEnum three = new PojoWithDateAndEnum(); three.group = "b"; three.date = new Date(666); three.cat = Category.CAT_B; data.add(three); - + return env.fromCollection(data); } @@ -695,7 +693,7 @@ public class CollectionDataSets { pwc1.bigInt = BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.TEN); pwc1.scalaBigInt = BigInt.int2bigInt(10); pwc1.bigDecimalKeepItNull = null; - + // use calendar to make it stable across time zones GregorianCalendar gcl1 = new GregorianCalendar(2033, 4, 18); pwc1.sqlDate = new java.sql.Date(gcl1.getTimeInMillis()); @@ -712,7 +710,7 @@ public class CollectionDataSets { pwc2.bigInt = BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.TEN); pwc2.scalaBigInt = BigInt.int2bigInt(31104000); pwc2.bigDecimalKeepItNull = null; - + GregorianCalendar gcl2 = new GregorianCalendar(1976, 4, 3); pwc2.sqlDate = new java.sql.Date(gcl2.getTimeInMillis()); // 1976 http://git-wip-us.apache.org/repos/asf/flink/blob/586f8181/flink-tests/src/test/java/org/apache/flink/test/runtime/kryo/KryoCollectionsSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/kryo/KryoCollectionsSerializerTest.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/kryo/KryoCollectionsSerializerTest.java deleted file mode 100644 index 0e8f482..0000000 --- a/flink-tests/src/test/java/org/apache/flink/test/runtime/kryo/KryoCollectionsSerializerTest.java +++ /dev/null @@ -1,185 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.runtime.kryo; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import com.esotericsoftware.kryo.Kryo; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeutils.ComparatorTestBase; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest; -import org.apache.flink.api.java.typeutils.runtime.TestDataOutputSerializer; -import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.Random; -import java.util.Set; - -@SuppressWarnings("unchecked") -public class KryoCollectionsSerializerTest extends AbstractGenericTypeSerializerTest { - - private ExecutionConfig ec = new ExecutionConfig(); - - @Test - public void testJavaList(){ - Collection<Integer> a = new ArrayList<>(); - fillCollection(a); - runTests(a); - } - - @Test - public void testJavaSet(){ - Collection<Integer> b = new HashSet<>(); - fillCollection(b); - runTests(b); - } - - @Test - public void testJavaDequeue(){ - Collection<Integer> c = new LinkedList<>(); - fillCollection(c); - runTests(c); - } - - @Test - public void testJavaArraysAsList(){ - Collection<Integer> a = Arrays.asList(42, 1337, 49, 1); - runTests(a); - } - - @Test - public void testJavaUnmodifiableSet(){ - Set<Integer> b = new HashSet<>(); - fillCollection(b); - runTests(Collections.unmodifiableSet(b)); - } - - @Test - public void testJavaSingletonList(){ - Collection<Integer> c = Collections.singletonList(42); - runTests(c); - } - - private void fillCollection(Collection<Integer> coll) { - coll.add(42); - coll.add(1337); - coll.add(49); - coll.add(1); - } - - @Override - protected <T> TypeSerializer<T> createSerializer(Class<T> type) { - return new KryoSerializer<T>(type, ec); - } - - /** - * Make sure that the kryo serializer forwards EOF exceptions properly when serializing - */ - @Test - public void testForwardEOFExceptionWhileSerializing() { - try { - // construct a long string - String str; - { - char[] charData = new char[40000]; - Random rnd = new Random(); - - for (int i = 0; i < charData.length; i++) { - charData[i] = (char) rnd.nextInt(10000); - } - - str = new String(charData); - } - - // construct a memory target that is too small for the string - TestDataOutputSerializer target = new TestDataOutputSerializer(10000, 30000); - KryoSerializer<String> serializer = new KryoSerializer<String>(String.class, new ExecutionConfig()); - - try { - serializer.serialize(str, target); - fail("should throw a java.io.EOFException"); - } - catch (java.io.EOFException e) { - // that is how we like it - } - catch (Exception e) { - fail("throws wrong exception: should throw a java.io.EOFException, has thrown a " + e.getClass().getName()); - } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - /** - * Make sure that the kryo serializer forwards EOF exceptions properly when serializing - */ - @Test - public void testForwardEOFExceptionWhileDeserializing() { - try { - int numElements = 100; - // construct a memory target that is too small for the string - TestDataOutputSerializer target = new TestDataOutputSerializer(5*numElements, 5*numElements); - KryoSerializer<Integer> serializer = new KryoSerializer<>(Integer.class, new ExecutionConfig()); - - for(int i = 0; i < numElements; i++){ - serializer.serialize(i, target); - } - - ComparatorTestBase.TestInputView source = new ComparatorTestBase.TestInputView(target.copyByteBuffer()); - - for(int i = 0; i < numElements; i++){ - int value = serializer.deserialize(source); - assertEquals(i, value); - } - - try { - serializer.deserialize(source); - fail("should throw a java.io.EOFException"); - } - catch (java.io.EOFException e) { - // that is how we like it :-) - } - catch (Exception e) { - fail("throws wrong exception: should throw a java.io.EOFException, has thrown a " + e.getClass().getName()); - } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void validateReferenceMappingEnabled() { - KryoSerializer<String> serializer = new KryoSerializer<>(String.class, new ExecutionConfig()); - Kryo kryo = serializer.getKryo(); - assertTrue(kryo.getReferences()); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/586f8181/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 5f27ee5..5d20db6 100644 --- a/pom.xml +++ b/pom.xml @@ -98,7 +98,7 @@ under the License. <!-- Default scala versions, may be overwritten by build profiles --> <scala.version>2.10.4</scala.version> <scala.binary.version>2.10</scala.binary.version> - <chill.version>0.8.1</chill.version> + <chill.version>0.7.4</chill.version> <asm.version>5.0.4</asm.version> <zookeeper.version>3.4.6</zookeeper.version> <curator.version>2.8.0</curator.version>
