[hotfix] [kafka-tests] Clean up FlinkKafkaProducer011Tests

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b7d3589e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b7d3589e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b7d3589e

Branch: refs/heads/master
Commit: b7d3589e7732b6458ac4c0ad936666d670cac87b
Parents: 8cdf2ff
Author: gyao <[email protected]>
Authored: Thu Oct 26 19:25:35 2017 +0200
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Thu Nov 2 12:32:28 2017 +0800

----------------------------------------------------------------------
 .../kafka/FlinkKafkaProducer011Tests.java       | 49 +-------------------
 1 file changed, 1 insertion(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b7d3589e/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java
index 69c3ceb..381ba33 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.streaming.api.operators.StreamSink;
-import org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
@@ -36,13 +35,10 @@ import kafka.server.KafkaServer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.errors.ProducerFencedException;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -68,7 +64,7 @@ public class FlinkKafkaProducer011Tests extends KafkaTestBase 
{
        protected TypeInformationSerializationSchema<Integer> 
integerSerializationSchema =
                        new 
TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new 
ExecutionConfig());
        protected KeyedSerializationSchema<Integer> 
integerKeyedSerializationSchema =
-                       new 
KeyedSerializationSchemaWrapper(integerSerializationSchema);
+                       new 
KeyedSerializationSchemaWrapper<>(integerSerializationSchema);
 
        @Before
        public void before() {
@@ -83,49 +79,6 @@ public class FlinkKafkaProducer011Tests extends 
KafkaTestBase {
                extraProperties.put("isolation.level", "read_committed");
        }
 
-       @Test(timeout = 30000L)
-       public void testHappyPath() throws IOException {
-               String topicName = "flink-kafka-producer-happy-path";
-               try (Producer<String, String> kafkaProducer = new 
FlinkKafkaProducer<>(extraProperties)) {
-                       kafkaProducer.initTransactions();
-                       kafkaProducer.beginTransaction();
-                       kafkaProducer.send(new ProducerRecord<>(topicName, 
"42", "42"));
-                       kafkaProducer.commitTransaction();
-               }
-               assertRecord(topicName, "42", "42");
-               deleteTestTopic(topicName);
-       }
-
-       @Test(timeout = 30000L)
-       public void testResumeTransaction() throws IOException {
-               String topicName = "flink-kafka-producer-resume-transaction";
-               try (FlinkKafkaProducer<String, String> kafkaProducer = new 
FlinkKafkaProducer<>(extraProperties)) {
-                       kafkaProducer.initTransactions();
-                       kafkaProducer.beginTransaction();
-                       kafkaProducer.send(new ProducerRecord<>(topicName, 
"42", "42"));
-                       kafkaProducer.flush();
-                       long producerId = kafkaProducer.getProducerId();
-                       short epoch = kafkaProducer.getEpoch();
-
-                       try (FlinkKafkaProducer<String, String> resumeProducer 
= new FlinkKafkaProducer<>(extraProperties)) {
-                               resumeProducer.resumeTransaction(producerId, 
epoch);
-                               resumeProducer.commitTransaction();
-                       }
-
-                       assertRecord(topicName, "42", "42");
-
-                       // this shouldn't throw - in case of network split, old 
producer might attempt to commit it's transaction
-                       kafkaProducer.commitTransaction();
-
-                       // this shouldn't fail also, for same reason as above
-                       try (FlinkKafkaProducer<String, String> resumeProducer 
= new FlinkKafkaProducer<>(extraProperties)) {
-                               resumeProducer.resumeTransaction(producerId, 
epoch);
-                               resumeProducer.commitTransaction();
-                       }
-               }
-               deleteTestTopic(topicName);
-       }
-
        @Test(timeout = 120_000L)
        public void testFlinkKafkaProducer011FailBeforeNotify() throws 
Exception {
                String topic = "flink-kafka-producer-fail-before-notify";

Reply via email to