[hotfix][kafka-tests] Fix test names so that they are not ignored by mvn build
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/856b6baf Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/856b6baf Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/856b6baf Branch: refs/heads/master Commit: 856b6baf1672ac0a9eaedc56cb18562e934ebac3 Parents: 872c35e Author: Piotr Nowojski <[email protected]> Authored: Fri Oct 27 15:11:24 2017 +0200 Committer: Tzu-Li (Gordon) Tai <[email protected]> Committed: Thu Nov 2 12:43:20 2017 +0800 ---------------------------------------------------------------------- .../kafka/FlinkKafkaProducer011Test.java | 519 +++++++++++++++++++ .../kafka/FlinkKafkaProducer011Tests.java | 519 ------------------- .../kafka/FlinkKafkaProducerTest.java | 114 ++++ .../kafka/FlinkKafkaProducerTests.java | 114 ---- 4 files changed, 633 insertions(+), 633 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/856b6baf/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Test.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Test.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Test.java new file mode 100644 index 0000000..1b87ff7 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Test.java @@ -0,0 +1,519 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema; + +import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables; + +import kafka.server.KafkaServer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.errors.ProducerFencedException; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.flink.util.Preconditions.checkState; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +/** + * IT cases for the {@link FlinkKafkaProducer011}. + */ +@SuppressWarnings("serial") +public class FlinkKafkaProducer011Test extends KafkaTestBase { + protected String transactionalId; + protected Properties extraProperties; + + protected TypeInformationSerializationSchema<Integer> integerSerializationSchema = + new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig()); + protected KeyedSerializationSchema<Integer> integerKeyedSerializationSchema = + new KeyedSerializationSchemaWrapper<>(integerSerializationSchema); + + @Before + public void before() { + transactionalId = UUID.randomUUID().toString(); + extraProperties = new Properties(); + extraProperties.putAll(standardProps); + extraProperties.put("transactional.id", transactionalId); + extraProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + extraProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + extraProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + extraProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + extraProperties.put("isolation.level", "read_committed"); + } + + @Test(timeout = 120_000L) + public void testFlinkKafkaProducer011FailBeforeNotify() throws Exception { + String topic = "flink-kafka-producer-fail-before-notify"; + + OneInputStreamOperatorTestHarness<Integer, Object> testHarness = createTestHarness(topic); + + testHarness.setup(); + testHarness.open(); + testHarness.initializeState(null); + testHarness.processElement(42, 0); + testHarness.snapshot(0, 1); + testHarness.processElement(43, 2); + OperatorStateHandles snapshot = testHarness.snapshot(1, 3); + + int leaderId = kafkaServer.getLeaderToShutDown(topic); + failBroker(leaderId); + + try { + testHarness.processElement(44, 4); + testHarness.snapshot(2, 5); + assertFalse(true); + } + catch (Exception ex) { + // expected + } + try { + testHarness.close(); + } + catch (Exception ex) { + } + + kafkaServer.restartBroker(leaderId); + + testHarness = createTestHarness(topic); + testHarness.setup(); + testHarness.initializeState(snapshot); + testHarness.close(); + + assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(42, 43), 30_000L); + + deleteTestTopic(topic); + } + + @Test(timeout = 120_000L) + public void testFlinkKafkaProducer011FailTransactionCoordinatorBeforeNotify() throws Exception { + String topic = "flink-kafka-producer-fail-transaction-coordinator-before-notify"; + + Properties properties = createProperties(); + + FlinkKafkaProducer011<Integer> kafkaProducer = new FlinkKafkaProducer011<>( + topic, + integerKeyedSerializationSchema, + properties, + FlinkKafkaProducer011.Semantic.EXACTLY_ONCE); + + OneInputStreamOperatorTestHarness<Integer, Object> testHarness1 = new OneInputStreamOperatorTestHarness<>( + new StreamSink<>(kafkaProducer), + IntSerializer.INSTANCE); + + testHarness1.setup(); + testHarness1.open(); + testHarness1.initializeState(null); + testHarness1.processElement(42, 0); + testHarness1.snapshot(0, 1); + testHarness1.processElement(43, 2); + int transactionCoordinatorId = kafkaProducer.getTransactionCoordinatorId(); + OperatorStateHandles snapshot = testHarness1.snapshot(1, 3); + + failBroker(transactionCoordinatorId); + + try { + testHarness1.processElement(44, 4); + testHarness1.notifyOfCompletedCheckpoint(1); + testHarness1.close(); + } + catch (Exception ex) { + // Expected... some random exception could be thrown by any of the above operations. + } + finally { + kafkaServer.restartBroker(transactionCoordinatorId); + } + + try (OneInputStreamOperatorTestHarness<Integer, Object> testHarness2 = createTestHarness(topic)) { + testHarness2.setup(); + testHarness2.initializeState(snapshot); + testHarness2.open(); + } + + assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(42, 43), 30_000L); + + deleteTestTopic(topic); + } + + /** + * This tests checks whether FlinkKafkaProducer011 correctly aborts lingering transactions after a failure. + * If such transactions were left alone lingering it consumers would be unable to read committed records + * that were created after this lingering transaction. + */ + @Test(timeout = 120_000L) + public void testFailBeforeNotifyAndResumeWorkAfterwards() throws Exception { + String topic = "flink-kafka-producer-fail-before-notify"; + + OneInputStreamOperatorTestHarness<Integer, Object> testHarness = createTestHarness(topic); + + testHarness.setup(); + testHarness.open(); + testHarness.processElement(42, 0); + testHarness.snapshot(0, 1); + testHarness.processElement(43, 2); + OperatorStateHandles snapshot1 = testHarness.snapshot(1, 3); + + testHarness.processElement(44, 4); + testHarness.snapshot(2, 5); + testHarness.processElement(45, 6); + + // do not close previous testHarness to make sure that closing do not clean up something (in case of failure + // there might not be any close) + testHarness = createTestHarness(topic); + testHarness.setup(); + // restore from snapshot1, transactions with records 44 and 45 should be aborted + testHarness.initializeState(snapshot1); + testHarness.open(); + + // write and commit more records, after potentially lingering transactions + testHarness.processElement(46, 7); + testHarness.snapshot(4, 8); + testHarness.processElement(47, 9); + testHarness.notifyOfCompletedCheckpoint(4); + + //now we should have: + // - records 42 and 43 in committed transactions + // - aborted transactions with records 44 and 45 + // - committed transaction with record 46 + // - pending transaction with record 47 + assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(42, 43, 46), 30_000L); + + testHarness.close(); + deleteTestTopic(topic); + } + + /** + * This tests checks whether FlinkKafkaProducer011 correctly aborts lingering transactions after a failure, + * which happened before first checkpoint and was followed up by reducing the parallelism. + * If such transactions were left alone lingering it consumers would be unable to read committed records + * that were created after this lingering transaction. + */ + @Test(timeout = 120_000L) + public void testScaleDownBeforeFirstCheckpoint() throws Exception { + String topic = "scale-down-before-first-checkpoint"; + + List<AutoCloseable> operatorsToClose = new ArrayList<>(); + int preScaleDownParallelism = Math.max(2, FlinkKafkaProducer011.SAFE_SCALE_DOWN_FACTOR); + for (int subtaskIndex = 0; subtaskIndex < preScaleDownParallelism; subtaskIndex++) { + OneInputStreamOperatorTestHarness<Integer, Object> preScaleDownOperator = createTestHarness( + topic, + preScaleDownParallelism, + preScaleDownParallelism, + subtaskIndex); + + preScaleDownOperator.setup(); + preScaleDownOperator.open(); + preScaleDownOperator.processElement(subtaskIndex * 2, 0); + preScaleDownOperator.snapshot(0, 1); + preScaleDownOperator.processElement(subtaskIndex * 2 + 1, 2); + + operatorsToClose.add(preScaleDownOperator); + } + + // do not close previous testHarnesses to make sure that closing do not clean up something (in case of failure + // there might not be any close) + + // After previous failure simulate restarting application with smaller parallelism + OneInputStreamOperatorTestHarness<Integer, Object> postScaleDownOperator1 = createTestHarness(topic, 1, 1, 0); + + postScaleDownOperator1.setup(); + postScaleDownOperator1.open(); + + // write and commit more records, after potentially lingering transactions + postScaleDownOperator1.processElement(46, 7); + postScaleDownOperator1.snapshot(4, 8); + postScaleDownOperator1.processElement(47, 9); + postScaleDownOperator1.notifyOfCompletedCheckpoint(4); + + //now we should have: + // - records 42, 43, 44 and 45 in aborted transactions + // - committed transaction with record 46 + // - pending transaction with record 47 + assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(46), 30_000L); + + postScaleDownOperator1.close(); + // ignore ProducerFencedExceptions, because postScaleDownOperator1 could reuse transactional ids. + for (AutoCloseable operatorToClose : operatorsToClose) { + closeIgnoringProducerFenced(operatorToClose); + } + deleteTestTopic(topic); + } + + /** + * Each instance of FlinkKafkaProducer011 uses it's own pool of transactional ids. After the restore from checkpoint + * transactional ids are redistributed across the subtasks. In case of scale down, the surplus transactional ids + * are dropped. In case of scale up, new one are generated (for the new subtasks). This test make sure that sequence + * of scaling down and up again works fine. Especially it checks whether the newly generated ids in scaling up + * do not overlap with ids that were used before scaling down. For example we start with 4 ids and parallelism 4: + * [1], [2], [3], [4] - one assigned per each subtask + * we scale down to parallelism 2: + * [1, 2], [3, 4] - first subtask got id 1 and 2, second got ids 3 and 4 + * surplus ids are dropped from the pools and we scale up to parallelism 3: + * [1 or 2], [3 or 4], [???] + * new subtask have to generate new id(s), but he can not use ids that are potentially in use, so it has to generate + * new ones that are greater then 4. + */ + @Test(timeout = 120_000L) + public void testScaleUpAfterScalingDown() throws Exception { + String topic = "scale-down-before-first-checkpoint"; + + final int parallelism1 = 4; + final int parallelism2 = 2; + final int parallelism3 = 3; + final int maxParallelism = Math.max(parallelism1, Math.max(parallelism2, parallelism3)); + + List<OperatorStateHandle> operatorStateHandles = repartitionAndExecute( + topic, + Collections.emptyList(), + parallelism1, + maxParallelism, + IntStream.range(0, parallelism1).boxed().iterator()); + + operatorStateHandles = repartitionAndExecute( + topic, + operatorStateHandles, + parallelism2, + maxParallelism, + IntStream.range(parallelism1, parallelism1 + parallelism2).boxed().iterator()); + + operatorStateHandles = repartitionAndExecute( + topic, + operatorStateHandles, + parallelism3, + maxParallelism, + IntStream.range(parallelism1 + parallelism2, parallelism1 + parallelism2 + parallelism3).boxed().iterator()); + + // After each previous repartitionAndExecute call, we are left with some lingering transactions, that would + // not allow us to read all committed messages from the topic. Thus we initialize operators from + // operatorStateHandles once more, but without any new data. This should terminate all ongoing transactions. + + operatorStateHandles = repartitionAndExecute( + topic, + operatorStateHandles, + 1, + maxParallelism, + Collections.emptyIterator()); + + assertExactlyOnceForTopic( + createProperties(), + topic, + 0, + IntStream.range(0, parallelism1 + parallelism2 + parallelism3).boxed().collect(Collectors.toList()), + 30_000L); + deleteTestTopic(topic); + } + + private List<OperatorStateHandle> repartitionAndExecute( + String topic, + List<OperatorStateHandle> inputStates, + int parallelism, + int maxParallelism, + Iterator<Integer> inputData) throws Exception { + + List<OperatorStateHandle> outputStates = new ArrayList<>(); + List<OneInputStreamOperatorTestHarness<Integer, Object>> testHarnesses = new ArrayList<>(); + + for (int subtaskIndex = 0; subtaskIndex < parallelism; subtaskIndex++) { + OneInputStreamOperatorTestHarness<Integer, Object> testHarness = + createTestHarness(topic, maxParallelism, parallelism, subtaskIndex); + testHarnesses.add(testHarness); + + testHarness.setup(); + + testHarness.initializeState(new OperatorStateHandles( + 0, + Collections.emptyList(), + Collections.emptyList(), + inputStates, + Collections.emptyList())); + testHarness.open(); + + if (inputData.hasNext()) { + int nextValue = inputData.next(); + testHarness.processElement(nextValue, 0); + OperatorStateHandles snapshot = testHarness.snapshot(0, 0); + + outputStates.addAll(snapshot.getManagedOperatorState()); + checkState(snapshot.getRawOperatorState() == null, "Unexpected raw operator state"); + checkState(snapshot.getManagedKeyedState() == null, "Unexpected managed keyed state"); + checkState(snapshot.getRawKeyedState() == null, "Unexpected raw keyed state"); + + for (int i = 1; i < FlinkKafkaProducer011.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE - 1; i++) { + testHarness.processElement(-nextValue, 0); + testHarness.snapshot(i, 0); + } + } + } + + for (OneInputStreamOperatorTestHarness<Integer, Object> testHarness : testHarnesses) { + testHarness.close(); + } + + return outputStates; + } + + @Test + public void testRecoverCommittedTransaction() throws Exception { + String topic = "flink-kafka-producer-recover-committed-transaction"; + + OneInputStreamOperatorTestHarness<Integer, Object> testHarness = createTestHarness(topic); + + testHarness.setup(); + testHarness.open(); // producerA - start transaction (txn) 0 + testHarness.processElement(42, 0); // producerA - write 42 in txn 0 + OperatorStateHandles checkpoint0 = testHarness.snapshot(0, 1); // producerA - pre commit txn 0, producerB - start txn 1 + testHarness.processElement(43, 2); // producerB - write 43 in txn 1 + testHarness.notifyOfCompletedCheckpoint(0); // producerA - commit txn 0 and return to the pool + testHarness.snapshot(1, 3); // producerB - pre txn 1, producerA - start txn 2 + testHarness.processElement(44, 4); // producerA - write 44 in txn 2 + testHarness.close(); // producerA - abort txn 2 + + testHarness = createTestHarness(topic); + testHarness.initializeState(checkpoint0); // recover state 0 - producerA recover and commit txn 0 + testHarness.close(); + + assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(42), 30_000L); + + deleteTestTopic(topic); + } + + @Test + public void testRunOutOfProducersInThePool() throws Exception { + String topic = "flink-kafka-run-out-of-producers"; + + try (OneInputStreamOperatorTestHarness<Integer, Object> testHarness = createTestHarness(topic)) { + + testHarness.setup(); + testHarness.open(); + + for (int i = 0; i < FlinkKafkaProducer011.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE * 2; i++) { + testHarness.processElement(i, i * 2); + testHarness.snapshot(i, i * 2 + 1); + } + } + catch (Exception ex) { + if (!ex.getCause().getMessage().startsWith("Too many ongoing")) { + throw ex; + } + } + deleteTestTopic(topic); + } + + // shut down a Kafka broker + private void failBroker(int brokerId) { + KafkaServer toShutDown = null; + for (KafkaServer server : kafkaServer.getBrokers()) { + + if (kafkaServer.getBrokerId(server) == brokerId) { + toShutDown = server; + break; + } + } + + if (toShutDown == null) { + StringBuilder listOfBrokers = new StringBuilder(); + for (KafkaServer server : kafkaServer.getBrokers()) { + listOfBrokers.append(kafkaServer.getBrokerId(server)); + listOfBrokers.append(" ; "); + } + + throw new IllegalArgumentException("Cannot find broker to shut down: " + brokerId + + " ; available brokers: " + listOfBrokers.toString()); + } else { + toShutDown.shutdown(); + toShutDown.awaitShutdown(); + } + } + + private void assertRecord(String topicName, String expectedKey, String expectedValue) { + try (KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(extraProperties)) { + kafkaConsumer.subscribe(Collections.singletonList(topicName)); + ConsumerRecords<String, String> records = kafkaConsumer.poll(10000); + + ConsumerRecord<String, String> record = Iterables.getOnlyElement(records); + assertEquals(expectedKey, record.key()); + assertEquals(expectedValue, record.value()); + } + } + + private void closeIgnoringProducerFenced(AutoCloseable autoCloseable) throws Exception { + try { + autoCloseable.close(); + } + catch (Exception ex) { + if (!(ex.getCause() instanceof ProducerFencedException)) { + throw ex; + } + } + } + + private OneInputStreamOperatorTestHarness<Integer, Object> createTestHarness(String topic) throws Exception { + return createTestHarness(topic, 1, 1, 0); + } + + private OneInputStreamOperatorTestHarness<Integer, Object> createTestHarness( + String topic, + int maxParallelism, + int parallelism, + int subtaskIndex) throws Exception { + Properties properties = createProperties(); + + FlinkKafkaProducer011<Integer> kafkaProducer = new FlinkKafkaProducer011<>( + topic, + integerKeyedSerializationSchema, + properties, + FlinkKafkaProducer011.Semantic.EXACTLY_ONCE); + + return new OneInputStreamOperatorTestHarness<>( + new StreamSink<>(kafkaProducer), + maxParallelism, + parallelism, + subtaskIndex, + IntSerializer.INSTANCE); + } + + private Properties createProperties() { + Properties properties = new Properties(); + properties.putAll(standardProps); + properties.putAll(secureProps); + properties.put(FlinkKafkaProducer011.KEY_DISABLE_METRICS, "true"); + return properties; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/856b6baf/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java deleted file mode 100644 index 381ba33..0000000 --- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java +++ /dev/null @@ -1,519 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeutils.base.IntSerializer; -import org.apache.flink.runtime.state.OperatorStateHandle; -import org.apache.flink.streaming.api.operators.StreamSink; -import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; -import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; -import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; -import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; -import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema; - -import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables; - -import kafka.server.KafkaServer; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.errors.ProducerFencedException; -import org.junit.Before; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Properties; -import java.util.UUID; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - -import static org.apache.flink.util.Preconditions.checkState; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; - -/** - * IT cases for the {@link FlinkKafkaProducer011}. - */ -@SuppressWarnings("serial") -public class FlinkKafkaProducer011Tests extends KafkaTestBase { - protected String transactionalId; - protected Properties extraProperties; - - protected TypeInformationSerializationSchema<Integer> integerSerializationSchema = - new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig()); - protected KeyedSerializationSchema<Integer> integerKeyedSerializationSchema = - new KeyedSerializationSchemaWrapper<>(integerSerializationSchema); - - @Before - public void before() { - transactionalId = UUID.randomUUID().toString(); - extraProperties = new Properties(); - extraProperties.putAll(standardProps); - extraProperties.put("transactional.id", transactionalId); - extraProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - extraProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - extraProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - extraProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - extraProperties.put("isolation.level", "read_committed"); - } - - @Test(timeout = 120_000L) - public void testFlinkKafkaProducer011FailBeforeNotify() throws Exception { - String topic = "flink-kafka-producer-fail-before-notify"; - - OneInputStreamOperatorTestHarness<Integer, Object> testHarness = createTestHarness(topic); - - testHarness.setup(); - testHarness.open(); - testHarness.initializeState(null); - testHarness.processElement(42, 0); - testHarness.snapshot(0, 1); - testHarness.processElement(43, 2); - OperatorStateHandles snapshot = testHarness.snapshot(1, 3); - - int leaderId = kafkaServer.getLeaderToShutDown(topic); - failBroker(leaderId); - - try { - testHarness.processElement(44, 4); - testHarness.snapshot(2, 5); - assertFalse(true); - } - catch (Exception ex) { - // expected - } - try { - testHarness.close(); - } - catch (Exception ex) { - } - - kafkaServer.restartBroker(leaderId); - - testHarness = createTestHarness(topic); - testHarness.setup(); - testHarness.initializeState(snapshot); - testHarness.close(); - - assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(42, 43), 30_000L); - - deleteTestTopic(topic); - } - - @Test(timeout = 120_000L) - public void testFlinkKafkaProducer011FailTransactionCoordinatorBeforeNotify() throws Exception { - String topic = "flink-kafka-producer-fail-transaction-coordinator-before-notify"; - - Properties properties = createProperties(); - - FlinkKafkaProducer011<Integer> kafkaProducer = new FlinkKafkaProducer011<>( - topic, - integerKeyedSerializationSchema, - properties, - FlinkKafkaProducer011.Semantic.EXACTLY_ONCE); - - OneInputStreamOperatorTestHarness<Integer, Object> testHarness1 = new OneInputStreamOperatorTestHarness<>( - new StreamSink<>(kafkaProducer), - IntSerializer.INSTANCE); - - testHarness1.setup(); - testHarness1.open(); - testHarness1.initializeState(null); - testHarness1.processElement(42, 0); - testHarness1.snapshot(0, 1); - testHarness1.processElement(43, 2); - int transactionCoordinatorId = kafkaProducer.getTransactionCoordinatorId(); - OperatorStateHandles snapshot = testHarness1.snapshot(1, 3); - - failBroker(transactionCoordinatorId); - - try { - testHarness1.processElement(44, 4); - testHarness1.notifyOfCompletedCheckpoint(1); - testHarness1.close(); - } - catch (Exception ex) { - // Expected... some random exception could be thrown by any of the above operations. - } - finally { - kafkaServer.restartBroker(transactionCoordinatorId); - } - - try (OneInputStreamOperatorTestHarness<Integer, Object> testHarness2 = createTestHarness(topic)) { - testHarness2.setup(); - testHarness2.initializeState(snapshot); - testHarness2.open(); - } - - assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(42, 43), 30_000L); - - deleteTestTopic(topic); - } - - /** - * This tests checks whether FlinkKafkaProducer011 correctly aborts lingering transactions after a failure. - * If such transactions were left alone lingering it consumers would be unable to read committed records - * that were created after this lingering transaction. - */ - @Test(timeout = 120_000L) - public void testFailBeforeNotifyAndResumeWorkAfterwards() throws Exception { - String topic = "flink-kafka-producer-fail-before-notify"; - - OneInputStreamOperatorTestHarness<Integer, Object> testHarness = createTestHarness(topic); - - testHarness.setup(); - testHarness.open(); - testHarness.processElement(42, 0); - testHarness.snapshot(0, 1); - testHarness.processElement(43, 2); - OperatorStateHandles snapshot1 = testHarness.snapshot(1, 3); - - testHarness.processElement(44, 4); - testHarness.snapshot(2, 5); - testHarness.processElement(45, 6); - - // do not close previous testHarness to make sure that closing do not clean up something (in case of failure - // there might not be any close) - testHarness = createTestHarness(topic); - testHarness.setup(); - // restore from snapshot1, transactions with records 44 and 45 should be aborted - testHarness.initializeState(snapshot1); - testHarness.open(); - - // write and commit more records, after potentially lingering transactions - testHarness.processElement(46, 7); - testHarness.snapshot(4, 8); - testHarness.processElement(47, 9); - testHarness.notifyOfCompletedCheckpoint(4); - - //now we should have: - // - records 42 and 43 in committed transactions - // - aborted transactions with records 44 and 45 - // - committed transaction with record 46 - // - pending transaction with record 47 - assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(42, 43, 46), 30_000L); - - testHarness.close(); - deleteTestTopic(topic); - } - - /** - * This tests checks whether FlinkKafkaProducer011 correctly aborts lingering transactions after a failure, - * which happened before first checkpoint and was followed up by reducing the parallelism. - * If such transactions were left alone lingering it consumers would be unable to read committed records - * that were created after this lingering transaction. - */ - @Test(timeout = 120_000L) - public void testScaleDownBeforeFirstCheckpoint() throws Exception { - String topic = "scale-down-before-first-checkpoint"; - - List<AutoCloseable> operatorsToClose = new ArrayList<>(); - int preScaleDownParallelism = Math.max(2, FlinkKafkaProducer011.SAFE_SCALE_DOWN_FACTOR); - for (int subtaskIndex = 0; subtaskIndex < preScaleDownParallelism; subtaskIndex++) { - OneInputStreamOperatorTestHarness<Integer, Object> preScaleDownOperator = createTestHarness( - topic, - preScaleDownParallelism, - preScaleDownParallelism, - subtaskIndex); - - preScaleDownOperator.setup(); - preScaleDownOperator.open(); - preScaleDownOperator.processElement(subtaskIndex * 2, 0); - preScaleDownOperator.snapshot(0, 1); - preScaleDownOperator.processElement(subtaskIndex * 2 + 1, 2); - - operatorsToClose.add(preScaleDownOperator); - } - - // do not close previous testHarnesses to make sure that closing do not clean up something (in case of failure - // there might not be any close) - - // After previous failure simulate restarting application with smaller parallelism - OneInputStreamOperatorTestHarness<Integer, Object> postScaleDownOperator1 = createTestHarness(topic, 1, 1, 0); - - postScaleDownOperator1.setup(); - postScaleDownOperator1.open(); - - // write and commit more records, after potentially lingering transactions - postScaleDownOperator1.processElement(46, 7); - postScaleDownOperator1.snapshot(4, 8); - postScaleDownOperator1.processElement(47, 9); - postScaleDownOperator1.notifyOfCompletedCheckpoint(4); - - //now we should have: - // - records 42, 43, 44 and 45 in aborted transactions - // - committed transaction with record 46 - // - pending transaction with record 47 - assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(46), 30_000L); - - postScaleDownOperator1.close(); - // ignore ProducerFencedExceptions, because postScaleDownOperator1 could reuse transactional ids. - for (AutoCloseable operatorToClose : operatorsToClose) { - closeIgnoringProducerFenced(operatorToClose); - } - deleteTestTopic(topic); - } - - /** - * Each instance of FlinkKafkaProducer011 uses it's own pool of transactional ids. After the restore from checkpoint - * transactional ids are redistributed across the subtasks. In case of scale down, the surplus transactional ids - * are dropped. In case of scale up, new one are generated (for the new subtasks). This test make sure that sequence - * of scaling down and up again works fine. Especially it checks whether the newly generated ids in scaling up - * do not overlap with ids that were used before scaling down. For example we start with 4 ids and parallelism 4: - * [1], [2], [3], [4] - one assigned per each subtask - * we scale down to parallelism 2: - * [1, 2], [3, 4] - first subtask got id 1 and 2, second got ids 3 and 4 - * surplus ids are dropped from the pools and we scale up to parallelism 3: - * [1 or 2], [3 or 4], [???] - * new subtask have to generate new id(s), but he can not use ids that are potentially in use, so it has to generate - * new ones that are greater then 4. - */ - @Test(timeout = 120_000L) - public void testScaleUpAfterScalingDown() throws Exception { - String topic = "scale-down-before-first-checkpoint"; - - final int parallelism1 = 4; - final int parallelism2 = 2; - final int parallelism3 = 3; - final int maxParallelism = Math.max(parallelism1, Math.max(parallelism2, parallelism3)); - - List<OperatorStateHandle> operatorStateHandles = repartitionAndExecute( - topic, - Collections.emptyList(), - parallelism1, - maxParallelism, - IntStream.range(0, parallelism1).boxed().iterator()); - - operatorStateHandles = repartitionAndExecute( - topic, - operatorStateHandles, - parallelism2, - maxParallelism, - IntStream.range(parallelism1, parallelism1 + parallelism2).boxed().iterator()); - - operatorStateHandles = repartitionAndExecute( - topic, - operatorStateHandles, - parallelism3, - maxParallelism, - IntStream.range(parallelism1 + parallelism2, parallelism1 + parallelism2 + parallelism3).boxed().iterator()); - - // After each previous repartitionAndExecute call, we are left with some lingering transactions, that would - // not allow us to read all committed messages from the topic. Thus we initialize operators from - // operatorStateHandles once more, but without any new data. This should terminate all ongoing transactions. - - operatorStateHandles = repartitionAndExecute( - topic, - operatorStateHandles, - 1, - maxParallelism, - Collections.emptyIterator()); - - assertExactlyOnceForTopic( - createProperties(), - topic, - 0, - IntStream.range(0, parallelism1 + parallelism2 + parallelism3).boxed().collect(Collectors.toList()), - 30_000L); - deleteTestTopic(topic); - } - - private List<OperatorStateHandle> repartitionAndExecute( - String topic, - List<OperatorStateHandle> inputStates, - int parallelism, - int maxParallelism, - Iterator<Integer> inputData) throws Exception { - - List<OperatorStateHandle> outputStates = new ArrayList<>(); - List<OneInputStreamOperatorTestHarness<Integer, Object>> testHarnesses = new ArrayList<>(); - - for (int subtaskIndex = 0; subtaskIndex < parallelism; subtaskIndex++) { - OneInputStreamOperatorTestHarness<Integer, Object> testHarness = - createTestHarness(topic, maxParallelism, parallelism, subtaskIndex); - testHarnesses.add(testHarness); - - testHarness.setup(); - - testHarness.initializeState(new OperatorStateHandles( - 0, - Collections.emptyList(), - Collections.emptyList(), - inputStates, - Collections.emptyList())); - testHarness.open(); - - if (inputData.hasNext()) { - int nextValue = inputData.next(); - testHarness.processElement(nextValue, 0); - OperatorStateHandles snapshot = testHarness.snapshot(0, 0); - - outputStates.addAll(snapshot.getManagedOperatorState()); - checkState(snapshot.getRawOperatorState() == null, "Unexpected raw operator state"); - checkState(snapshot.getManagedKeyedState() == null, "Unexpected managed keyed state"); - checkState(snapshot.getRawKeyedState() == null, "Unexpected raw keyed state"); - - for (int i = 1; i < FlinkKafkaProducer011.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE - 1; i++) { - testHarness.processElement(-nextValue, 0); - testHarness.snapshot(i, 0); - } - } - } - - for (OneInputStreamOperatorTestHarness<Integer, Object> testHarness : testHarnesses) { - testHarness.close(); - } - - return outputStates; - } - - @Test - public void testRecoverCommittedTransaction() throws Exception { - String topic = "flink-kafka-producer-recover-committed-transaction"; - - OneInputStreamOperatorTestHarness<Integer, Object> testHarness = createTestHarness(topic); - - testHarness.setup(); - testHarness.open(); // producerA - start transaction (txn) 0 - testHarness.processElement(42, 0); // producerA - write 42 in txn 0 - OperatorStateHandles checkpoint0 = testHarness.snapshot(0, 1); // producerA - pre commit txn 0, producerB - start txn 1 - testHarness.processElement(43, 2); // producerB - write 43 in txn 1 - testHarness.notifyOfCompletedCheckpoint(0); // producerA - commit txn 0 and return to the pool - testHarness.snapshot(1, 3); // producerB - pre txn 1, producerA - start txn 2 - testHarness.processElement(44, 4); // producerA - write 44 in txn 2 - testHarness.close(); // producerA - abort txn 2 - - testHarness = createTestHarness(topic); - testHarness.initializeState(checkpoint0); // recover state 0 - producerA recover and commit txn 0 - testHarness.close(); - - assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(42), 30_000L); - - deleteTestTopic(topic); - } - - @Test - public void testRunOutOfProducersInThePool() throws Exception { - String topic = "flink-kafka-run-out-of-producers"; - - try (OneInputStreamOperatorTestHarness<Integer, Object> testHarness = createTestHarness(topic)) { - - testHarness.setup(); - testHarness.open(); - - for (int i = 0; i < FlinkKafkaProducer011.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE * 2; i++) { - testHarness.processElement(i, i * 2); - testHarness.snapshot(i, i * 2 + 1); - } - } - catch (Exception ex) { - if (!ex.getCause().getMessage().startsWith("Too many ongoing")) { - throw ex; - } - } - deleteTestTopic(topic); - } - - // shut down a Kafka broker - private void failBroker(int brokerId) { - KafkaServer toShutDown = null; - for (KafkaServer server : kafkaServer.getBrokers()) { - - if (kafkaServer.getBrokerId(server) == brokerId) { - toShutDown = server; - break; - } - } - - if (toShutDown == null) { - StringBuilder listOfBrokers = new StringBuilder(); - for (KafkaServer server : kafkaServer.getBrokers()) { - listOfBrokers.append(kafkaServer.getBrokerId(server)); - listOfBrokers.append(" ; "); - } - - throw new IllegalArgumentException("Cannot find broker to shut down: " + brokerId - + " ; available brokers: " + listOfBrokers.toString()); - } else { - toShutDown.shutdown(); - toShutDown.awaitShutdown(); - } - } - - private void assertRecord(String topicName, String expectedKey, String expectedValue) { - try (KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(extraProperties)) { - kafkaConsumer.subscribe(Collections.singletonList(topicName)); - ConsumerRecords<String, String> records = kafkaConsumer.poll(10000); - - ConsumerRecord<String, String> record = Iterables.getOnlyElement(records); - assertEquals(expectedKey, record.key()); - assertEquals(expectedValue, record.value()); - } - } - - private void closeIgnoringProducerFenced(AutoCloseable autoCloseable) throws Exception { - try { - autoCloseable.close(); - } - catch (Exception ex) { - if (!(ex.getCause() instanceof ProducerFencedException)) { - throw ex; - } - } - } - - private OneInputStreamOperatorTestHarness<Integer, Object> createTestHarness(String topic) throws Exception { - return createTestHarness(topic, 1, 1, 0); - } - - private OneInputStreamOperatorTestHarness<Integer, Object> createTestHarness( - String topic, - int maxParallelism, - int parallelism, - int subtaskIndex) throws Exception { - Properties properties = createProperties(); - - FlinkKafkaProducer011<Integer> kafkaProducer = new FlinkKafkaProducer011<>( - topic, - integerKeyedSerializationSchema, - properties, - FlinkKafkaProducer011.Semantic.EXACTLY_ONCE); - - return new OneInputStreamOperatorTestHarness<>( - new StreamSink<>(kafkaProducer), - maxParallelism, - parallelism, - subtaskIndex, - IntSerializer.INSTANCE); - } - - private Properties createProperties() { - Properties properties = new Properties(); - properties.putAll(standardProps); - properties.putAll(secureProps); - properties.put(FlinkKafkaProducer011.KEY_DISABLE_METRICS, "true"); - return properties; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/856b6baf/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java new file mode 100644 index 0000000..ab26f8b --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer; + +import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collections; +import java.util.Properties; +import java.util.UUID; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for our own {@link FlinkKafkaProducer}. + */ +@SuppressWarnings("serial") +public class FlinkKafkaProducerTest extends KafkaTestBase { + protected String transactionalId; + protected Properties extraProperties; + + @Before + public void before() { + transactionalId = UUID.randomUUID().toString(); + extraProperties = new Properties(); + extraProperties.putAll(standardProps); + extraProperties.put("transactional.id", transactionalId); + extraProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + extraProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + extraProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + extraProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + extraProperties.put("isolation.level", "read_committed"); + } + + @Test(timeout = 30000L) + public void testHappyPath() throws IOException { + String topicName = "flink-kafka-producer-happy-path"; + try (Producer<String, String> kafkaProducer = new FlinkKafkaProducer<>(extraProperties)) { + kafkaProducer.initTransactions(); + kafkaProducer.beginTransaction(); + kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42")); + kafkaProducer.commitTransaction(); + } + assertRecord(topicName, "42", "42"); + deleteTestTopic(topicName); + } + + @Test(timeout = 30000L) + public void testResumeTransaction() throws IOException { + String topicName = "flink-kafka-producer-resume-transaction"; + try (FlinkKafkaProducer<String, String> kafkaProducer = new FlinkKafkaProducer<>(extraProperties)) { + kafkaProducer.initTransactions(); + kafkaProducer.beginTransaction(); + kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42")); + kafkaProducer.flush(); + long producerId = kafkaProducer.getProducerId(); + short epoch = kafkaProducer.getEpoch(); + + try (FlinkKafkaProducer<String, String> resumeProducer = new FlinkKafkaProducer<>(extraProperties)) { + resumeProducer.resumeTransaction(producerId, epoch); + resumeProducer.commitTransaction(); + } + + assertRecord(topicName, "42", "42"); + + // this shouldn't throw - in case of network split, old producer might attempt to commit it's transaction + kafkaProducer.commitTransaction(); + + // this shouldn't fail also, for same reason as above + try (FlinkKafkaProducer<String, String> resumeProducer = new FlinkKafkaProducer<>(extraProperties)) { + resumeProducer.resumeTransaction(producerId, epoch); + resumeProducer.commitTransaction(); + } + } + deleteTestTopic(topicName); + } + + private void assertRecord(String topicName, String expectedKey, String expectedValue) { + try (KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(extraProperties)) { + kafkaConsumer.subscribe(Collections.singletonList(topicName)); + ConsumerRecords<String, String> records = kafkaConsumer.poll(10000); + + ConsumerRecord<String, String> record = Iterables.getOnlyElement(records); + assertEquals(expectedKey, record.key()); + assertEquals(expectedValue, record.value()); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/856b6baf/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTests.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTests.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTests.java deleted file mode 100644 index 18bbd8f..0000000 --- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTests.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer; - -import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables; - -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.junit.Before; -import org.junit.Test; - -import java.io.IOException; -import java.util.Collections; -import java.util.Properties; -import java.util.UUID; - -import static org.junit.Assert.assertEquals; - -/** - * Tests for our own {@link FlinkKafkaProducer}. - */ -@SuppressWarnings("serial") -public class FlinkKafkaProducerTests extends KafkaTestBase { - protected String transactionalId; - protected Properties extraProperties; - - @Before - public void before() { - transactionalId = UUID.randomUUID().toString(); - extraProperties = new Properties(); - extraProperties.putAll(standardProps); - extraProperties.put("transactional.id", transactionalId); - extraProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - extraProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - extraProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - extraProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - extraProperties.put("isolation.level", "read_committed"); - } - - @Test(timeout = 30000L) - public void testHappyPath() throws IOException { - String topicName = "flink-kafka-producer-happy-path"; - try (Producer<String, String> kafkaProducer = new FlinkKafkaProducer<>(extraProperties)) { - kafkaProducer.initTransactions(); - kafkaProducer.beginTransaction(); - kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42")); - kafkaProducer.commitTransaction(); - } - assertRecord(topicName, "42", "42"); - deleteTestTopic(topicName); - } - - @Test(timeout = 30000L) - public void testResumeTransaction() throws IOException { - String topicName = "flink-kafka-producer-resume-transaction"; - try (FlinkKafkaProducer<String, String> kafkaProducer = new FlinkKafkaProducer<>(extraProperties)) { - kafkaProducer.initTransactions(); - kafkaProducer.beginTransaction(); - kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42")); - kafkaProducer.flush(); - long producerId = kafkaProducer.getProducerId(); - short epoch = kafkaProducer.getEpoch(); - - try (FlinkKafkaProducer<String, String> resumeProducer = new FlinkKafkaProducer<>(extraProperties)) { - resumeProducer.resumeTransaction(producerId, epoch); - resumeProducer.commitTransaction(); - } - - assertRecord(topicName, "42", "42"); - - // this shouldn't throw - in case of network split, old producer might attempt to commit it's transaction - kafkaProducer.commitTransaction(); - - // this shouldn't fail also, for same reason as above - try (FlinkKafkaProducer<String, String> resumeProducer = new FlinkKafkaProducer<>(extraProperties)) { - resumeProducer.resumeTransaction(producerId, epoch); - resumeProducer.commitTransaction(); - } - } - deleteTestTopic(topicName); - } - - private void assertRecord(String topicName, String expectedKey, String expectedValue) { - try (KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(extraProperties)) { - kafkaConsumer.subscribe(Collections.singletonList(topicName)); - ConsumerRecords<String, String> records = kafkaConsumer.poll(10000); - - ConsumerRecord<String, String> record = Iterables.getOnlyElement(records); - assertEquals(expectedKey, record.key()); - assertEquals(expectedValue, record.value()); - } - } -}
