http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java deleted file mode 100644 index 51bc8d1..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java +++ /dev/null @@ -1,196 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.util.serialization; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.runtime.util.DataInputDeserializer; -import org.apache.flink.runtime.util.DataOutputSerializer; - -import java.io.IOException; - -/** - * A serialization and deserialization schema for Key Value Pairs that uses Flink's serialization stack to - * transform typed from and to byte arrays. - * - * @param <K> The key type to be serialized. - * @param <V> The value type to be serialized. - */ -public class TypeInformationKeyValueSerializationSchema<K, V> implements KeyedDeserializationSchema<Tuple2<K, V>>, KeyedSerializationSchema<Tuple2<K,V>> { - - private static final long serialVersionUID = -5359448468131559102L; - - /** The serializer for the key */ - private final TypeSerializer<K> keySerializer; - - /** The serializer for the value */ - private final TypeSerializer<V> valueSerializer; - - /** reusable input deserialization buffer */ - private final DataInputDeserializer inputDeserializer; - - /** reusable output serialization buffer for the key */ - private transient DataOutputSerializer keyOutputSerializer; - - /** reusable output serialization buffer for the value */ - private transient DataOutputSerializer valueOutputSerializer; - - - /** The type information, to be returned by {@link #getProducedType()}. It is - * transient, because it is not serializable. Note that this means that the type information - * is not available at runtime, but only prior to the first serialization / deserialization */ - private final transient TypeInformation<Tuple2<K, V>> typeInfo; - - // ------------------------------------------------------------------------ - - /** - * Creates a new de-/serialization schema for the given types. - * - * @param keyTypeInfo The type information for the key type de-/serialized by this schema. - * @param valueTypeInfo The type information for the value type de-/serialized by this schema. - * @param ec The execution config, which is used to parametrize the type serializers. - */ - public TypeInformationKeyValueSerializationSchema(TypeInformation<K> keyTypeInfo, TypeInformation<V> valueTypeInfo, ExecutionConfig ec) { - this.typeInfo = new TupleTypeInfo<>(keyTypeInfo, valueTypeInfo); - this.keySerializer = keyTypeInfo.createSerializer(ec); - this.valueSerializer = valueTypeInfo.createSerializer(ec); - this.inputDeserializer = new DataInputDeserializer(); - } - - /** - * Creates a new de-/serialization schema for the given types. This constructor accepts the types - * as classes and internally constructs the type information from the classes. - * - * <p>If the types are parametrized and cannot be fully defined via classes, use the constructor - * that accepts {@link TypeInformation} instead. - * - * @param keyClass The class of the key de-/serialized by this schema. - * @param valueClass The class of the value de-/serialized by this schema. - * @param config The execution config, which is used to parametrize the type serializers. - */ - public TypeInformationKeyValueSerializationSchema(Class<K> keyClass, Class<V> valueClass, ExecutionConfig config) { - this(TypeExtractor.createTypeInfo(keyClass), TypeExtractor.createTypeInfo(valueClass), config); - } - - // ------------------------------------------------------------------------ - - - @Override - public Tuple2<K, V> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException { - K key = null; - V value = null; - - if (messageKey != null) { - inputDeserializer.setBuffer(messageKey, 0, messageKey.length); - key = keySerializer.deserialize(inputDeserializer); - } - if (message != null) { - inputDeserializer.setBuffer(message, 0, message.length); - value = valueSerializer.deserialize(inputDeserializer); - } - return new Tuple2<>(key, value); - } - - /** - * This schema never considers an element to signal end-of-stream, so this method returns always false. - * @param nextElement The element to test for the end-of-stream signal. - * @return Returns false. - */ - @Override - public boolean isEndOfStream(Tuple2<K,V> nextElement) { - return false; - } - - - @Override - public byte[] serializeKey(Tuple2<K, V> element) { - if (element.f0 == null) { - return null; - } else { - // key is not null. serialize it: - if (keyOutputSerializer == null) { - keyOutputSerializer = new DataOutputSerializer(16); - } - try { - keySerializer.serialize(element.f0, keyOutputSerializer); - } - catch (IOException e) { - throw new RuntimeException("Unable to serialize record", e); - } - // check if key byte array size changed - byte[] res = keyOutputSerializer.getByteArray(); - if (res.length != keyOutputSerializer.length()) { - byte[] n = new byte[keyOutputSerializer.length()]; - System.arraycopy(res, 0, n, 0, keyOutputSerializer.length()); - res = n; - } - keyOutputSerializer.clear(); - return res; - } - } - - @Override - public byte[] serializeValue(Tuple2<K, V> element) { - // if the value is null, its serialized value is null as well. - if (element.f1 == null) { - return null; - } - - if (valueOutputSerializer == null) { - valueOutputSerializer = new DataOutputSerializer(16); - } - - try { - valueSerializer.serialize(element.f1, valueOutputSerializer); - } - catch (IOException e) { - throw new RuntimeException("Unable to serialize record", e); - } - - byte[] res = valueOutputSerializer.getByteArray(); - if (res.length != valueOutputSerializer.length()) { - byte[] n = new byte[valueOutputSerializer.length()]; - System.arraycopy(res, 0, n, 0, valueOutputSerializer.length()); - res = n; - } - valueOutputSerializer.clear(); - return res; - } - - @Override - public String getTargetTopic(Tuple2<K, V> element) { - return null; // we are never overriding the topic - } - - - @Override - public TypeInformation<Tuple2<K,V>> getProducedType() { - if (typeInfo != null) { - return typeInfo; - } - else { - throw new IllegalStateException( - "The type information is not available after this class has been serialized and distributed."); - } - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java deleted file mode 100644 index b96ba30..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java +++ /dev/null @@ -1,416 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.commons.collections.map.LinkedMap; -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.api.common.state.ListState; -import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.api.common.state.OperatorStateStore; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.runtime.state.StateInitializationContext; -import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; -import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; -import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; -import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher; -import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; -import org.apache.flink.util.SerializedValue; -import org.junit.Assert; -import org.junit.Test; -import org.mockito.Matchers; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -import java.io.Serializable; -import java.lang.reflect.Field; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -public class FlinkKafkaConsumerBaseTest { - - /** - * Tests that not both types of timestamp extractors / watermark generators can be used. - */ - @Test - public void testEitherWatermarkExtractor() { - try { - new DummyFlinkKafkaConsumer<>().assignTimestampsAndWatermarks((AssignerWithPeriodicWatermarks<Object>) null); - fail(); - } catch (NullPointerException ignored) {} - - try { - new DummyFlinkKafkaConsumer<>().assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks<Object>) null); - fail(); - } catch (NullPointerException ignored) {} - - @SuppressWarnings("unchecked") - final AssignerWithPeriodicWatermarks<String> periodicAssigner = mock(AssignerWithPeriodicWatermarks.class); - @SuppressWarnings("unchecked") - final AssignerWithPunctuatedWatermarks<String> punctuatedAssigner = mock(AssignerWithPunctuatedWatermarks.class); - - DummyFlinkKafkaConsumer<String> c1 = new DummyFlinkKafkaConsumer<>(); - c1.assignTimestampsAndWatermarks(periodicAssigner); - try { - c1.assignTimestampsAndWatermarks(punctuatedAssigner); - fail(); - } catch (IllegalStateException ignored) {} - - DummyFlinkKafkaConsumer<String> c2 = new DummyFlinkKafkaConsumer<>(); - c2.assignTimestampsAndWatermarks(punctuatedAssigner); - try { - c2.assignTimestampsAndWatermarks(periodicAssigner); - fail(); - } catch (IllegalStateException ignored) {} - } - - /** - * Tests that no checkpoints happen when the fetcher is not running. - */ - @Test - public void ignoreCheckpointWhenNotRunning() throws Exception { - @SuppressWarnings("unchecked") - final AbstractFetcher<String, ?> fetcher = mock(AbstractFetcher.class); - - FlinkKafkaConsumerBase<String> consumer = getConsumer(fetcher, new LinkedMap(), false); - OperatorStateStore operatorStateStore = mock(OperatorStateStore.class); - TestingListState<Tuple2<KafkaTopicPartition, Long>> listState = new TestingListState<>(); - when(operatorStateStore.getOperatorState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState); - - consumer.snapshotState(new StateSnapshotContextSynchronousImpl(1, 1)); - - assertFalse(listState.get().iterator().hasNext()); - consumer.notifyCheckpointComplete(66L); - } - - /** - * Tests that no checkpoints happen when the fetcher is not running. - */ - @Test - public void checkRestoredCheckpointWhenFetcherNotReady() throws Exception { - OperatorStateStore operatorStateStore = mock(OperatorStateStore.class); - - TestingListState<Serializable> listState = new TestingListState<>(); - listState.add(Tuple2.of(new KafkaTopicPartition("abc", 13), 16768L)); - listState.add(Tuple2.of(new KafkaTopicPartition("def", 7), 987654321L)); - - FlinkKafkaConsumerBase<String> consumer = getConsumer(null, new LinkedMap(), true); - - when(operatorStateStore.getSerializableListState(Matchers.any(String.class))).thenReturn(listState); - - StateInitializationContext initializationContext = mock(StateInitializationContext.class); - - when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore); - when(initializationContext.isRestored()).thenReturn(true); - - consumer.initializeState(initializationContext); - - consumer.snapshotState(new StateSnapshotContextSynchronousImpl(17, 17)); - - // ensure that the list was cleared and refilled. while this is an implementation detail, we use it here - // to figure out that snapshotState() actually did something. - Assert.assertTrue(listState.isClearCalled()); - - Set<Serializable> expected = new HashSet<>(); - - for (Serializable serializable : listState.get()) { - expected.add(serializable); - } - - int counter = 0; - - for (Serializable serializable : listState.get()) { - assertTrue(expected.contains(serializable)); - counter++; - } - - assertEquals(expected.size(), counter); - } - - /** - * Tests that no checkpoints happen when the fetcher is not running. - */ - @Test - public void checkRestoredNullCheckpointWhenFetcherNotReady() throws Exception { - FlinkKafkaConsumerBase<String> consumer = getConsumer(null, new LinkedMap(), true); - - OperatorStateStore operatorStateStore = mock(OperatorStateStore.class); - TestingListState<Serializable> listState = new TestingListState<>(); - when(operatorStateStore.getSerializableListState(Matchers.any(String.class))).thenReturn(listState); - - StateInitializationContext initializationContext = mock(StateInitializationContext.class); - - when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore); - when(initializationContext.isRestored()).thenReturn(false); - - consumer.initializeState(initializationContext); - - consumer.snapshotState(new StateSnapshotContextSynchronousImpl(17, 17)); - - assertFalse(listState.get().iterator().hasNext()); - } - - /** - * Tests that on snapshots, states and offsets to commit to Kafka are correct - */ - @Test - public void checkUseFetcherWhenNoCheckpoint() throws Exception { - - FlinkKafkaConsumerBase<String> consumer = getConsumer(null, new LinkedMap(), true); - List<KafkaTopicPartition> partitionList = new ArrayList<>(1); - partitionList.add(new KafkaTopicPartition("test", 0)); - consumer.setSubscribedPartitions(partitionList); - - OperatorStateStore operatorStateStore = mock(OperatorStateStore.class); - TestingListState<Serializable> listState = new TestingListState<>(); - when(operatorStateStore.getSerializableListState(Matchers.any(String.class))).thenReturn(listState); - - StateInitializationContext initializationContext = mock(StateInitializationContext.class); - - when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore); - - // make the context signal that there is no restored state, then validate that - when(initializationContext.isRestored()).thenReturn(false); - consumer.initializeState(initializationContext); - consumer.run(mock(SourceFunction.SourceContext.class)); - } - - @Test - @SuppressWarnings("unchecked") - public void testSnapshotState() throws Exception { - - // -------------------------------------------------------------------- - // prepare fake states - // -------------------------------------------------------------------- - - final HashMap<KafkaTopicPartition, Long> state1 = new HashMap<>(); - state1.put(new KafkaTopicPartition("abc", 13), 16768L); - state1.put(new KafkaTopicPartition("def", 7), 987654321L); - - final HashMap<KafkaTopicPartition, Long> state2 = new HashMap<>(); - state2.put(new KafkaTopicPartition("abc", 13), 16770L); - state2.put(new KafkaTopicPartition("def", 7), 987654329L); - - final HashMap<KafkaTopicPartition, Long> state3 = new HashMap<>(); - state3.put(new KafkaTopicPartition("abc", 13), 16780L); - state3.put(new KafkaTopicPartition("def", 7), 987654377L); - - // -------------------------------------------------------------------- - - final AbstractFetcher<String, ?> fetcher = mock(AbstractFetcher.class); - when(fetcher.snapshotCurrentState()).thenReturn(state1, state2, state3); - - final LinkedMap pendingOffsetsToCommit = new LinkedMap(); - - FlinkKafkaConsumerBase<String> consumer = getConsumer(fetcher, pendingOffsetsToCommit, true); - assertEquals(0, pendingOffsetsToCommit.size()); - - OperatorStateStore backend = mock(OperatorStateStore.class); - - TestingListState<Serializable> listState = new TestingListState<>(); - - when(backend.getSerializableListState(Matchers.any(String.class))).thenReturn(listState); - - StateInitializationContext initializationContext = mock(StateInitializationContext.class); - - when(initializationContext.getOperatorStateStore()).thenReturn(backend); - when(initializationContext.isRestored()).thenReturn(false, true, true, true); - - consumer.initializeState(initializationContext); - - // checkpoint 1 - consumer.snapshotState(new StateSnapshotContextSynchronousImpl(138, 138)); - - HashMap<KafkaTopicPartition, Long> snapshot1 = new HashMap<>(); - - for (Serializable serializable : listState.get()) { - Tuple2<KafkaTopicPartition, Long> kafkaTopicPartitionLongTuple2 = (Tuple2<KafkaTopicPartition, Long>) serializable; - snapshot1.put(kafkaTopicPartitionLongTuple2.f0, kafkaTopicPartitionLongTuple2.f1); - } - - assertEquals(state1, snapshot1); - assertEquals(1, pendingOffsetsToCommit.size()); - assertEquals(state1, pendingOffsetsToCommit.get(138L)); - - // checkpoint 2 - consumer.snapshotState(new StateSnapshotContextSynchronousImpl(140, 140)); - - HashMap<KafkaTopicPartition, Long> snapshot2 = new HashMap<>(); - - for (Serializable serializable : listState.get()) { - Tuple2<KafkaTopicPartition, Long> kafkaTopicPartitionLongTuple2 = (Tuple2<KafkaTopicPartition, Long>) serializable; - snapshot2.put(kafkaTopicPartitionLongTuple2.f0, kafkaTopicPartitionLongTuple2.f1); - } - - assertEquals(state2, snapshot2); - assertEquals(2, pendingOffsetsToCommit.size()); - assertEquals(state2, pendingOffsetsToCommit.get(140L)); - - // ack checkpoint 1 - consumer.notifyCheckpointComplete(138L); - assertEquals(1, pendingOffsetsToCommit.size()); - assertTrue(pendingOffsetsToCommit.containsKey(140L)); - - // checkpoint 3 - consumer.snapshotState(new StateSnapshotContextSynchronousImpl(141, 141)); - - HashMap<KafkaTopicPartition, Long> snapshot3 = new HashMap<>(); - - for (Serializable serializable : listState.get()) { - Tuple2<KafkaTopicPartition, Long> kafkaTopicPartitionLongTuple2 = (Tuple2<KafkaTopicPartition, Long>) serializable; - snapshot3.put(kafkaTopicPartitionLongTuple2.f0, kafkaTopicPartitionLongTuple2.f1); - } - - assertEquals(state3, snapshot3); - assertEquals(2, pendingOffsetsToCommit.size()); - assertEquals(state3, pendingOffsetsToCommit.get(141L)); - - // ack checkpoint 3, subsumes number 2 - consumer.notifyCheckpointComplete(141L); - assertEquals(0, pendingOffsetsToCommit.size()); - - - consumer.notifyCheckpointComplete(666); // invalid checkpoint - assertEquals(0, pendingOffsetsToCommit.size()); - - OperatorStateStore operatorStateStore = mock(OperatorStateStore.class); - listState = new TestingListState<>(); - when(operatorStateStore.getOperatorState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState); - - // create 500 snapshots - for (int i = 100; i < 600; i++) { - consumer.snapshotState(new StateSnapshotContextSynchronousImpl(i, i)); - listState.clear(); - } - assertEquals(FlinkKafkaConsumerBase.MAX_NUM_PENDING_CHECKPOINTS, pendingOffsetsToCommit.size()); - - // commit only the second last - consumer.notifyCheckpointComplete(598); - assertEquals(1, pendingOffsetsToCommit.size()); - - // access invalid checkpoint - consumer.notifyCheckpointComplete(590); - - // and the last - consumer.notifyCheckpointComplete(599); - assertEquals(0, pendingOffsetsToCommit.size()); - } - - // ------------------------------------------------------------------------ - - private static <T> FlinkKafkaConsumerBase<T> getConsumer( - AbstractFetcher<T, ?> fetcher, LinkedMap pendingOffsetsToCommit, boolean running) throws Exception - { - FlinkKafkaConsumerBase<T> consumer = new DummyFlinkKafkaConsumer<>(); - - Field fetcherField = FlinkKafkaConsumerBase.class.getDeclaredField("kafkaFetcher"); - fetcherField.setAccessible(true); - fetcherField.set(consumer, fetcher); - - Field mapField = FlinkKafkaConsumerBase.class.getDeclaredField("pendingOffsetsToCommit"); - mapField.setAccessible(true); - mapField.set(consumer, pendingOffsetsToCommit); - - Field runningField = FlinkKafkaConsumerBase.class.getDeclaredField("running"); - runningField.setAccessible(true); - runningField.set(consumer, running); - - return consumer; - } - - // ------------------------------------------------------------------------ - - private static class DummyFlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase<T> { - private static final long serialVersionUID = 1L; - - @SuppressWarnings("unchecked") - public DummyFlinkKafkaConsumer() { - super(Arrays.asList("dummy-topic"), (KeyedDeserializationSchema < T >) mock(KeyedDeserializationSchema.class)); - } - - @Override - protected AbstractFetcher<T, ?> createFetcher(SourceContext<T> sourceContext, List<KafkaTopicPartition> thisSubtaskPartitions, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, StreamingRuntimeContext runtimeContext) throws Exception { - AbstractFetcher<T, ?> fetcher = mock(AbstractFetcher.class); - doAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocationOnMock) throws Throwable { - Assert.fail("Trying to restore offsets even though there was no restore state."); - return null; - } - }).when(fetcher).restoreOffsets(any(HashMap.class)); - return fetcher; - } - - @Override - protected List<KafkaTopicPartition> getKafkaPartitions(List<String> topics) { - return Collections.emptyList(); - } - - @Override - public RuntimeContext getRuntimeContext() { - return mock(StreamingRuntimeContext.class); - } - } - - private static final class TestingListState<T> implements ListState<T> { - - private final List<T> list = new ArrayList<>(); - private boolean clearCalled = false; - - @Override - public void clear() { - list.clear(); - clearCalled = true; - } - - @Override - public Iterable<T> get() throws Exception { - return list; - } - - @Override - public void add(T value) throws Exception { - list.add(value); - } - - public List<T> getList() { - return list; - } - - public boolean isClearCalled() { - return clearCalled; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java deleted file mode 100644 index 2e06160..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java +++ /dev/null @@ -1,288 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.api.java.tuple.Tuple1; -import org.apache.flink.streaming.api.operators.StreamSink; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.state.FunctionSnapshotContext; -import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; -import org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig; -import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; -import org.apache.kafka.clients.producer.Callback; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.Metric; -import org.apache.kafka.common.MetricName; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.serialization.ByteArraySerializer; -import org.junit.Assert; -import org.junit.Test; -import scala.concurrent.duration.Deadline; -import scala.concurrent.duration.FiniteDuration; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicBoolean; - -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -public class FlinkKafkaProducerBaseTest { - - /** - * Tests that the constructor eagerly checks bootstrap servers are set in config - */ - @Test(expected = IllegalArgumentException.class) - public void testInstantiationFailsWhenBootstrapServersMissing() throws Exception { - // no bootstrap servers set in props - Properties props = new Properties(); - // should throw IllegalArgumentException - new DummyFlinkKafkaProducer<>(props, null); - } - - /** - * Tests that constructor defaults to key value serializers in config to byte array deserializers if not set - */ - @Test - public void testKeyValueDeserializersSetIfMissing() throws Exception { - Properties props = new Properties(); - props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:12345"); - // should set missing key value deserializers - new DummyFlinkKafkaProducer<>(props, null); - - assertTrue(props.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)); - assertTrue(props.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)); - assertTrue(props.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).equals(ByteArraySerializer.class.getCanonicalName())); - assertTrue(props.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).equals(ByteArraySerializer.class.getCanonicalName())); - } - - /** - * Tests that partitions list is determinate and correctly provided to custom partitioner - */ - @Test - public void testPartitionerOpenedWithDeterminatePartitionList() throws Exception { - KafkaPartitioner mockPartitioner = mock(KafkaPartitioner.class); - RuntimeContext mockRuntimeContext = mock(RuntimeContext.class); - when(mockRuntimeContext.getIndexOfThisSubtask()).thenReturn(0); - when(mockRuntimeContext.getNumberOfParallelSubtasks()).thenReturn(1); - - DummyFlinkKafkaProducer producer = new DummyFlinkKafkaProducer( - FakeStandardProducerConfig.get(), mockPartitioner); - producer.setRuntimeContext(mockRuntimeContext); - - producer.open(new Configuration()); - - // the internal mock KafkaProducer will return an out-of-order list of 4 partitions, - // which should be sorted before provided to the custom partitioner's open() method - int[] correctPartitionList = {0, 1, 2, 3}; - verify(mockPartitioner).open(0, 1, correctPartitionList); - } - - /** - * Test ensuring that the producer is not dropping buffered records.; - * we set a timeout because the test will not finish if the logic is broken - */ - @Test(timeout=5000) - public void testAtLeastOnceProducer() throws Throwable { - runAtLeastOnceTest(true); - } - - /** - * Ensures that the at least once producing test fails if the flushing is disabled - */ - @Test(expected = AssertionError.class, timeout=5000) - public void testAtLeastOnceProducerFailsIfFlushingDisabled() throws Throwable { - runAtLeastOnceTest(false); - } - - private void runAtLeastOnceTest(boolean flushOnCheckpoint) throws Throwable { - final AtomicBoolean snapshottingFinished = new AtomicBoolean(false); - final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>( - FakeStandardProducerConfig.get(), null, snapshottingFinished); - producer.setFlushOnCheckpoint(flushOnCheckpoint); - - OneInputStreamOperatorTestHarness<String, Object> testHarness = - new OneInputStreamOperatorTestHarness<>(new StreamSink(producer)); - - testHarness.open(); - - for (int i = 0; i < 100; i++) { - testHarness.processElement(new StreamRecord<>("msg-" + i)); - } - - // start a thread confirming all pending records - final Tuple1<Throwable> runnableError = new Tuple1<>(null); - final Thread threadA = Thread.currentThread(); - - Runnable confirmer = new Runnable() { - @Override - public void run() { - try { - MockProducer mp = producer.getProducerInstance(); - List<Callback> pending = mp.getPending(); - - // we need to find out if the snapshot() method blocks forever - // this is not possible. If snapshot() is running, it will - // start removing elements from the pending list. - synchronized (threadA) { - threadA.wait(500L); - } - // we now check that no records have been confirmed yet - Assert.assertEquals(100, pending.size()); - Assert.assertFalse("Snapshot method returned before all records were confirmed", - snapshottingFinished.get()); - - // now confirm all checkpoints - for (Callback c: pending) { - c.onCompletion(null, null); - } - pending.clear(); - } catch(Throwable t) { - runnableError.f0 = t; - } - } - }; - Thread threadB = new Thread(confirmer); - threadB.start(); - - // this should block: - testHarness.snapshot(0, 0); - - synchronized (threadA) { - threadA.notifyAll(); // just in case, to let the test fail faster - } - Assert.assertEquals(0, producer.getProducerInstance().getPending().size()); - Deadline deadline = FiniteDuration.apply(5, "s").fromNow(); - while (deadline.hasTimeLeft() && threadB.isAlive()) { - threadB.join(500); - } - Assert.assertFalse("Thread A is expected to be finished at this point. If not, the test is prone to fail", threadB.isAlive()); - if (runnableError.f0 != null) { - throw runnableError.f0; - } - - testHarness.close(); - } - - - // ------------------------------------------------------------------------ - - private static class DummyFlinkKafkaProducer<T> extends FlinkKafkaProducerBase<T> { - private static final long serialVersionUID = 1L; - - private transient MockProducer prod; - private AtomicBoolean snapshottingFinished; - - @SuppressWarnings("unchecked") - public DummyFlinkKafkaProducer(Properties producerConfig, KafkaPartitioner partitioner, AtomicBoolean snapshottingFinished) { - super("dummy-topic", (KeyedSerializationSchema< T >) mock(KeyedSerializationSchema.class), producerConfig, partitioner); - this.snapshottingFinished = snapshottingFinished; - } - - // constructor variant for test irrelated to snapshotting - @SuppressWarnings("unchecked") - public DummyFlinkKafkaProducer(Properties producerConfig, KafkaPartitioner partitioner) { - super("dummy-topic", (KeyedSerializationSchema< T >) mock(KeyedSerializationSchema.class), producerConfig, partitioner); - this.snapshottingFinished = new AtomicBoolean(true); - } - - @Override - protected <K, V> KafkaProducer<K, V> getKafkaProducer(Properties props) { - this.prod = new MockProducer(); - return this.prod; - } - - @Override - public void snapshotState(FunctionSnapshotContext ctx) throws Exception { - // call the actual snapshot state - super.snapshotState(ctx); - // notify test that snapshotting has been done - snapshottingFinished.set(true); - } - - @Override - protected void flush() { - this.prod.flush(); - } - - public MockProducer getProducerInstance() { - return this.prod; - } - } - - private static class MockProducer<K, V> extends KafkaProducer<K, V> { - List<Callback> pendingCallbacks = new ArrayList<>(); - - public MockProducer() { - super(FakeStandardProducerConfig.get()); - } - - @Override - public Future<RecordMetadata> send(ProducerRecord<K, V> record) { - throw new UnsupportedOperationException("Unexpected"); - } - - @Override - public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { - pendingCallbacks.add(callback); - return null; - } - - @Override - public List<PartitionInfo> partitionsFor(String topic) { - List<PartitionInfo> list = new ArrayList<>(); - // deliberately return an out-of-order partition list - list.add(new PartitionInfo(topic, 3, null, null, null)); - list.add(new PartitionInfo(topic, 1, null, null, null)); - list.add(new PartitionInfo(topic, 0, null, null, null)); - list.add(new PartitionInfo(topic, 2, null, null, null)); - return list; - } - - @Override - public Map<MetricName, ? extends Metric> metrics() { - return null; - } - - - public List<Callback> getPending() { - return this.pendingCallbacks; - } - - public void flush() { - while (pendingCallbacks.size() > 0) { - try { - Thread.sleep(10); - } catch (InterruptedException e) { - throw new RuntimeException("Unable to flush producer, task was interrupted"); - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java deleted file mode 100644 index 1882a7e..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.kafka; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.flink.streaming.util.serialization.JSONDeserializationSchema; -import org.junit.Assert; -import org.junit.Test; - -import java.io.IOException; - -public class JSONDeserializationSchemaTest { - @Test - public void testDeserialize() throws IOException { - ObjectMapper mapper = new ObjectMapper(); - ObjectNode initialValue = mapper.createObjectNode(); - initialValue.put("key", 4).put("value", "world"); - byte[] serializedValue = mapper.writeValueAsBytes(initialValue); - - JSONDeserializationSchema schema = new JSONDeserializationSchema(); - ObjectNode deserializedValue = schema.deserialize(serializedValue); - - Assert.assertEquals(4, deserializedValue.get("key").asInt()); - Assert.assertEquals("world", deserializedValue.get("value").asText()); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java deleted file mode 100644 index 86d3105..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.kafka; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema; -import org.junit.Assert; -import org.junit.Test; - -import java.io.IOException; - -public class JSONKeyValueDeserializationSchemaTest { - @Test - public void testDeserializeWithoutMetadata() throws IOException { - ObjectMapper mapper = new ObjectMapper(); - ObjectNode initialKey = mapper.createObjectNode(); - initialKey.put("index", 4); - byte[] serializedKey = mapper.writeValueAsBytes(initialKey); - - ObjectNode initialValue = mapper.createObjectNode(); - initialValue.put("word", "world"); - byte[] serializedValue = mapper.writeValueAsBytes(initialValue); - - JSONKeyValueDeserializationSchema schema = new JSONKeyValueDeserializationSchema(false); - ObjectNode deserializedValue = schema.deserialize(serializedKey, serializedValue, "", 0, 0); - - - Assert.assertTrue(deserializedValue.get("metadata") == null); - Assert.assertEquals(4, deserializedValue.get("key").get("index").asInt()); - Assert.assertEquals("world", deserializedValue.get("value").get("word").asText()); - } - - @Test - public void testDeserializeWithMetadata() throws IOException { - ObjectMapper mapper = new ObjectMapper(); - ObjectNode initialKey = mapper.createObjectNode(); - initialKey.put("index", 4); - byte[] serializedKey = mapper.writeValueAsBytes(initialKey); - - ObjectNode initialValue = mapper.createObjectNode(); - initialValue.put("word", "world"); - byte[] serializedValue = mapper.writeValueAsBytes(initialValue); - - JSONKeyValueDeserializationSchema schema = new JSONKeyValueDeserializationSchema(true); - ObjectNode deserializedValue = schema.deserialize(serializedKey, serializedValue, "topic#1", 3, 4); - - Assert.assertEquals(4, deserializedValue.get("key").get("index").asInt()); - Assert.assertEquals("world", deserializedValue.get("value").get("word").asText()); - Assert.assertEquals("topic#1", deserializedValue.get("metadata").get("topic").asText()); - Assert.assertEquals(4, deserializedValue.get("metadata").get("offset").asInt()); - Assert.assertEquals(3, deserializedValue.get("metadata").get("partition").asInt()); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java deleted file mode 100644 index 68225e2..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.flink.api.table.Row; -import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema; -import org.junit.Test; - -import java.io.IOException; -import java.util.concurrent.ThreadLocalRandom; - -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -public class JsonRowDeserializationSchemaTest { - - /** - * Tests simple deserialization. - */ - @Test - public void testDeserialization() throws Exception { - long id = 1238123899121L; - String name = "asdlkjasjkdla998y1122"; - byte[] bytes = new byte[1024]; - ThreadLocalRandom.current().nextBytes(bytes); - - ObjectMapper objectMapper = new ObjectMapper(); - - // Root - ObjectNode root = objectMapper.createObjectNode(); - root.put("id", id); - root.put("name", name); - root.put("bytes", bytes); - - byte[] serializedJson = objectMapper.writeValueAsBytes(root); - - JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema( - new String[] { "id", "name", "bytes" }, - new Class<?>[] { Long.class, String.class, byte[].class }); - - Row deserialized = deserializationSchema.deserialize(serializedJson); - - assertEquals(3, deserialized.productArity()); - assertEquals(id, deserialized.productElement(0)); - assertEquals(name, deserialized.productElement(1)); - assertArrayEquals(bytes, (byte[]) deserialized.productElement(2)); - } - - /** - * Tests deserialization with non-existing field name. - */ - @Test - public void testMissingNode() throws Exception { - ObjectMapper objectMapper = new ObjectMapper(); - - // Root - ObjectNode root = objectMapper.createObjectNode(); - root.put("id", 123123123); - byte[] serializedJson = objectMapper.writeValueAsBytes(root); - - JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema( - new String[] { "name" }, - new Class<?>[] { String.class }); - - Row row = deserializationSchema.deserialize(serializedJson); - - assertEquals(1, row.productArity()); - assertNull("Missing field not null", row.productElement(0)); - - deserializationSchema.setFailOnMissingField(true); - - try { - deserializationSchema.deserialize(serializedJson); - fail("Did not throw expected Exception"); - } catch (IOException e) { - assertTrue(e.getCause() instanceof IllegalStateException); - } - } - - /** - * Tests that number of field names and types has to match. - */ - @Test - public void testNumberOfFieldNamesAndTypesMismatch() throws Exception { - try { - new JsonRowDeserializationSchema( - new String[] { "one", "two", "three" }, - new Class<?>[] { Long.class }); - fail("Did not throw expected Exception"); - } catch (IllegalArgumentException ignored) { - // Expected - } - - try { - new JsonRowDeserializationSchema( - new String[] { "one" }, - new Class<?>[] { Long.class, String.class }); - fail("Did not throw expected Exception"); - } catch (IllegalArgumentException ignored) { - // Expected - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java deleted file mode 100644 index 92af15d..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.api.table.Row; -import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema; -import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema; -import org.junit.Test; - -import java.io.IOException; - -import static org.junit.Assert.assertEquals; - -public class JsonRowSerializationSchemaTest { - @Test - public void testRowSerialization() throws IOException { - String[] fieldNames = new String[] {"f1", "f2", "f3"}; - Class[] fieldTypes = new Class[] {Integer.class, Boolean.class, String.class}; - Row row = new Row(3); - row.setField(0, 1); - row.setField(1, true); - row.setField(2, "str"); - - Row resultRow = serializeAndDeserialize(fieldNames, fieldTypes, row); - assertEqualRows(row, resultRow); - } - - @Test - public void testSerializationOfTwoRows() throws IOException { - String[] fieldNames = new String[] {"f1", "f2", "f3"}; - Class[] fieldTypes = new Class[] {Integer.class, Boolean.class, String.class}; - Row row1 = new Row(3); - row1.setField(0, 1); - row1.setField(1, true); - row1.setField(2, "str"); - - JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema(fieldNames); - JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema(fieldNames, fieldTypes); - - byte[] bytes = serializationSchema.serialize(row1); - assertEqualRows(row1, deserializationSchema.deserialize(bytes)); - - Row row2 = new Row(3); - row2.setField(0, 10); - row2.setField(1, false); - row2.setField(2, "newStr"); - - bytes = serializationSchema.serialize(row2); - assertEqualRows(row2, deserializationSchema.deserialize(bytes)); - } - - @Test(expected = NullPointerException.class) - public void testInputValidation() { - new JsonRowSerializationSchema(null); - } - - @Test(expected = IllegalStateException.class) - public void testSerializeRowWithInvalidNumberOfFields() { - String[] fieldNames = new String[] {"f1", "f2", "f3"}; - Row row = new Row(1); - row.setField(0, 1); - - JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema(fieldNames); - serializationSchema.serialize(row); - } - - private Row serializeAndDeserialize(String[] fieldNames, Class[] fieldTypes, Row row) throws IOException { - JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema(fieldNames); - JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema(fieldNames, fieldTypes); - - byte[] bytes = serializationSchema.serialize(row); - return deserializationSchema.deserialize(bytes); - } - - private void assertEqualRows(Row expectedRow, Row resultRow) { - assertEquals("Deserialized row should have expected number of fields", - expectedRow.productArity(), resultRow.productArity()); - for (int i = 0; i < expectedRow.productArity(); i++) { - assertEquals(String.format("Field number %d should be as in the original row", i), - expectedRow.productElement(i), resultRow.productElement(i)); - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java deleted file mode 100644 index 9beed22..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java +++ /dev/null @@ -1,269 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; - -import org.junit.Test; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -import static org.junit.Assert.*; - -/** - * Tests that the partition assignment is deterministic and stable. - */ -public class KafkaConsumerPartitionAssignmentTest { - - @Test - public void testPartitionsEqualConsumers() { - try { - List<KafkaTopicPartition> inPartitions = Arrays.asList( - new KafkaTopicPartition("test-topic", 4), - new KafkaTopicPartition("test-topic", 52), - new KafkaTopicPartition("test-topic", 17), - new KafkaTopicPartition("test-topic", 1)); - - for (int i = 0; i < inPartitions.size(); i++) { - List<KafkaTopicPartition> parts = - FlinkKafkaConsumerBase.assignPartitions(inPartitions, inPartitions.size(), i); - - assertNotNull(parts); - assertEquals(1, parts.size()); - assertTrue(contains(inPartitions, parts.get(0).getPartition())); - } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - private boolean contains(List<KafkaTopicPartition> inPartitions, int partition) { - for (KafkaTopicPartition ktp : inPartitions) { - if (ktp.getPartition() == partition) { - return true; - } - } - return false; - } - - @Test - public void testMultiplePartitionsPerConsumers() { - try { - final int[] partitionIDs = {4, 52, 17, 1, 2, 3, 89, 42, 31, 127, 14}; - - final List<KafkaTopicPartition> partitions = new ArrayList<>(); - final Set<KafkaTopicPartition> allPartitions = new HashSet<>(); - - for (int p : partitionIDs) { - KafkaTopicPartition part = new KafkaTopicPartition("test-topic", p); - partitions.add(part); - allPartitions.add(part); - } - - final int numConsumers = 3; - final int minPartitionsPerConsumer = partitions.size() / numConsumers; - final int maxPartitionsPerConsumer = partitions.size() / numConsumers + 1; - - for (int i = 0; i < numConsumers; i++) { - List<KafkaTopicPartition> parts = - FlinkKafkaConsumerBase.assignPartitions(partitions, numConsumers, i); - - assertNotNull(parts); - assertTrue(parts.size() >= minPartitionsPerConsumer); - assertTrue(parts.size() <= maxPartitionsPerConsumer); - - for (KafkaTopicPartition p : parts) { - // check that the element was actually contained - assertTrue(allPartitions.remove(p)); - } - } - - // all partitions must have been assigned - assertTrue(allPartitions.isEmpty()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testPartitionsFewerThanConsumers() { - try { - List<KafkaTopicPartition> inPartitions = Arrays.asList( - new KafkaTopicPartition("test-topic", 4), - new KafkaTopicPartition("test-topic", 52), - new KafkaTopicPartition("test-topic", 17), - new KafkaTopicPartition("test-topic", 1)); - - final Set<KafkaTopicPartition> allPartitions = new HashSet<>(); - allPartitions.addAll(inPartitions); - - final int numConsumers = 2 * inPartitions.size() + 3; - - for (int i = 0; i < numConsumers; i++) { - List<KafkaTopicPartition> parts = FlinkKafkaConsumerBase.assignPartitions(inPartitions, numConsumers, i); - - assertNotNull(parts); - assertTrue(parts.size() <= 1); - - for (KafkaTopicPartition p : parts) { - // check that the element was actually contained - assertTrue(allPartitions.remove(p)); - } - } - - // all partitions must have been assigned - assertTrue(allPartitions.isEmpty()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testAssignEmptyPartitions() { - try { - List<KafkaTopicPartition> ep = new ArrayList<>(); - List<KafkaTopicPartition> parts1 = FlinkKafkaConsumerBase.assignPartitions(ep, 4, 2); - assertNotNull(parts1); - assertTrue(parts1.isEmpty()); - - List<KafkaTopicPartition> parts2 = FlinkKafkaConsumerBase.assignPartitions(ep, 1, 0); - assertNotNull(parts2); - assertTrue(parts2.isEmpty()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testGrowingPartitionsRemainsStable() { - try { - final int[] newPartitionIDs = {4, 52, 17, 1, 2, 3, 89, 42, 31, 127, 14}; - List<KafkaTopicPartition> newPartitions = new ArrayList<>(); - - for (int p : newPartitionIDs) { - KafkaTopicPartition part = new KafkaTopicPartition("test-topic", p); - newPartitions.add(part); - } - - List<KafkaTopicPartition> initialPartitions = newPartitions.subList(0, 7); - - final Set<KafkaTopicPartition> allNewPartitions = new HashSet<>(newPartitions); - final Set<KafkaTopicPartition> allInitialPartitions = new HashSet<>(initialPartitions); - - final int numConsumers = 3; - final int minInitialPartitionsPerConsumer = initialPartitions.size() / numConsumers; - final int maxInitialPartitionsPerConsumer = initialPartitions.size() / numConsumers + 1; - final int minNewPartitionsPerConsumer = newPartitions.size() / numConsumers; - final int maxNewPartitionsPerConsumer = newPartitions.size() / numConsumers + 1; - - List<KafkaTopicPartition> parts1 = FlinkKafkaConsumerBase.assignPartitions( - initialPartitions, numConsumers, 0); - List<KafkaTopicPartition> parts2 = FlinkKafkaConsumerBase.assignPartitions( - initialPartitions, numConsumers, 1); - List<KafkaTopicPartition> parts3 = FlinkKafkaConsumerBase.assignPartitions( - initialPartitions, numConsumers, 2); - - assertNotNull(parts1); - assertNotNull(parts2); - assertNotNull(parts3); - - assertTrue(parts1.size() >= minInitialPartitionsPerConsumer); - assertTrue(parts1.size() <= maxInitialPartitionsPerConsumer); - assertTrue(parts2.size() >= minInitialPartitionsPerConsumer); - assertTrue(parts2.size() <= maxInitialPartitionsPerConsumer); - assertTrue(parts3.size() >= minInitialPartitionsPerConsumer); - assertTrue(parts3.size() <= maxInitialPartitionsPerConsumer); - - for (KafkaTopicPartition p : parts1) { - // check that the element was actually contained - assertTrue(allInitialPartitions.remove(p)); - } - for (KafkaTopicPartition p : parts2) { - // check that the element was actually contained - assertTrue(allInitialPartitions.remove(p)); - } - for (KafkaTopicPartition p : parts3) { - // check that the element was actually contained - assertTrue(allInitialPartitions.remove(p)); - } - - // all partitions must have been assigned - assertTrue(allInitialPartitions.isEmpty()); - - // grow the set of partitions and distribute anew - - List<KafkaTopicPartition> parts1new = FlinkKafkaConsumerBase.assignPartitions( - newPartitions, numConsumers, 0); - List<KafkaTopicPartition> parts2new = FlinkKafkaConsumerBase.assignPartitions( - newPartitions, numConsumers, 1); - List<KafkaTopicPartition> parts3new = FlinkKafkaConsumerBase.assignPartitions( - newPartitions, numConsumers, 2); - - // new partitions must include all old partitions - - assertTrue(parts1new.size() > parts1.size()); - assertTrue(parts2new.size() > parts2.size()); - assertTrue(parts3new.size() > parts3.size()); - - assertTrue(parts1new.containsAll(parts1)); - assertTrue(parts2new.containsAll(parts2)); - assertTrue(parts3new.containsAll(parts3)); - - assertTrue(parts1new.size() >= minNewPartitionsPerConsumer); - assertTrue(parts1new.size() <= maxNewPartitionsPerConsumer); - assertTrue(parts2new.size() >= minNewPartitionsPerConsumer); - assertTrue(parts2new.size() <= maxNewPartitionsPerConsumer); - assertTrue(parts3new.size() >= minNewPartitionsPerConsumer); - assertTrue(parts3new.size() <= maxNewPartitionsPerConsumer); - - for (KafkaTopicPartition p : parts1new) { - // check that the element was actually contained - assertTrue(allNewPartitions.remove(p)); - } - for (KafkaTopicPartition p : parts2new) { - // check that the element was actually contained - assertTrue(allNewPartitions.remove(p)); - } - for (KafkaTopicPartition p : parts3new) { - // check that the element was actually contained - assertTrue(allNewPartitions.remove(p)); - } - - // all partitions must have been assigned - assertTrue(allNewPartitions.isEmpty()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - -}
