[
https://issues.apache.org/jira/browse/FLINK-6996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16073276#comment-16073276
]
ASF GitHub Bot commented on FLINK-6996:
---------------------------------------
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/4206#discussion_r125411227
--- Diff:
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
---
@@ -172,6 +195,144 @@ public void cancel() {
}
}
+ /**
+ * Tests the at-least-once semantic for the simple writes into Kafka.
+ */
+ @Test
+ public void testOneToOneAtLeastOnceRegularSink() throws Exception {
+ testOneToOneAtLeastOnce(true);
+ }
+
+ /**
+ * Tests the at-least-once semantic for the simple writes into Kafka.
+ */
+ @Test
+ public void testOneToOneAtLeastOnceCustomOperator() throws Exception {
+ testOneToOneAtLeastOnce(false);
+ }
+
+ /**
+ * This test sets KafkaProducer so that it will not automatically flush
the data and
+ * and fails the broker to check whether FlinkKafkaProducer flushed
records manually on snapshotState.
+ */
+ protected void testOneToOneAtLeastOnce(boolean regularSink) throws
Exception {
+ final String topic = regularSink ? "oneToOneTopicRegularSink" :
"oneToOneTopicCustomOperator";
+ final int partition = 0;
+ final int numElements = 1000;
+ final int failAfterElements = 333;
+
+ createTestTopic(topic, 1, 1);
+
+ TypeInformationSerializationSchema<Integer> schema = new
TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new
ExecutionConfig());
+ KeyedSerializationSchema<Integer> keyedSerializationSchema =
new KeyedSerializationSchemaWrapper(schema);
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(500);
+ env.setParallelism(1);
+ env.setRestartStrategy(RestartStrategies.noRestart());
+ env.getConfig().disableSysoutLogging();
+
+ Properties properties = new Properties();
+ properties.putAll(standardProps);
+ properties.putAll(secureProps);
+ // decrease timeout and block time from 60s down to 10s - this
is how long KafkaProducer will try send pending (not flushed) data on close()
+ properties.setProperty("timeout.ms", "10000");
+ properties.setProperty("max.block.ms", "10000");
+ // increase batch.size and linger.ms - this tells KafkaProducer
to batch produced events instead of flushing them immediately
+ properties.setProperty("batch.size", "10240000");
+ properties.setProperty("linger.ms", "10000");
+
+ int leaderId = kafkaServer.getLeaderToShutDown(topic);
+ BrokerRestartingMapper.resetState();
+
+ // process exactly failAfterElements number of elements and
then shutdown Kafka broker and fail application
+ DataStream<Integer> inputStream = env
+ .fromCollection(getIntegersSequence(numElements))
+ .map(new BrokerRestartingMapper<Integer>(leaderId,
failAfterElements));
+
+ StreamSink<Integer> kafkaSink =
kafkaServer.getProducerSink(topic, keyedSerializationSchema, properties, new
FlinkKafkaPartitioner<Integer>() {
+ @Override
+ public int partition(Integer record, byte[] key, byte[]
value, String targetTopic, int[] partitions) {
+ return partition;
+ }
+ });
+
+ if (regularSink) {
+ inputStream.addSink(kafkaSink.getUserFunction());
+ }
+ else {
+ kafkaServer.produceIntoKafka(inputStream, topic,
keyedSerializationSchema, properties, new FlinkKafkaPartitioner<Integer>() {
+ @Override
+ public int partition(Integer record, byte[]
key, byte[] value, String targetTopic, int[] partitions) {
+ return partition;
+ }
+ });
+ }
+
+ FailingIdentityMapper.failedBefore = false;
+ try {
+ env.execute("One-to-one at least once test");
+ fail("Job should fail!");
+ }
+ catch (Exception ex) {
--- End diff --
I think we need a more specific exception here.
There may be actual exceptions thrown by Flink that would be masked by this
assumption.
> FlinkKafkaProducer010 doesn't guarantee at-least-once semantic
> --------------------------------------------------------------
>
> Key: FLINK-6996
> URL: https://issues.apache.org/jira/browse/FLINK-6996
> Project: Flink
> Issue Type: Bug
> Components: Kafka Connector
> Affects Versions: 1.2.0, 1.3.0, 1.2.1, 1.3.1
> Reporter: Piotr Nowojski
> Assignee: Piotr Nowojski
>
> FlinkKafkaProducer010 doesn't implement CheckpointedFunction interface. This
> means, when it's used like a "regular sink function" (option a from [the java
> doc|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.html])
> it will not flush the data on "snapshotState" as it is supposed to.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)