[
https://issues.apache.org/jira/browse/FLINK-5048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15662979#comment-15662979
]
ASF GitHub Bot commented on FLINK-5048:
---------------------------------------
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/2789#discussion_r87746504
--- Diff:
flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/HandoverTest.java
---
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import
org.apache.flink.streaming.connectors.kafka.internal.Handover.WakeupException;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests for the {@link Handover} between Kafka Consumer Thread and the
fetcher's main thread.
+ */
+public class HandoverTest {
+
+ //
------------------------------------------------------------------------
+ // test produce / consumer
+ //
------------------------------------------------------------------------
+
+ @Test
+ public void testWithVariableProducer() throws Exception {
+ runProducerConsumerTest(500, 2, 0);
+ }
+
+ @Test
+ public void testWithVariableConsumer() throws Exception {
+ runProducerConsumerTest(500, 0, 2);
+ }
+
+ @Test
+ public void testWithVariableBoth() throws Exception {
+ runProducerConsumerTest(500, 2, 2);
+ }
+
+ private void runProducerConsumerTest(int numRecords, int
maxProducerDelay, int maxConsumerDelay) throws Exception {
+ // generate test data
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ final ConsumerRecords<byte[], byte[]>[] data = new
ConsumerRecords[numRecords];
+ for (int i = 0; i < numRecords; i++) {
+ data[i] = createTestRecords();
+ }
+
+ final Handover handover = new Handover();
+
+ ProducerThread producer = new ProducerThread(handover, data,
maxProducerDelay);
+ ConsumerThread consumer = new ConsumerThread(handover, data,
maxConsumerDelay);
+
+ consumer.start();
+ producer.start();
+
+ // sync first on the consumer, so it propagates assertion errors
+ consumer.sync();
+ producer.sync();
+ }
+
+ //
------------------------------------------------------------------------
+ // test error propagation
+ //
------------------------------------------------------------------------
+
+ @Test
+ public void testPublishErrorOnEmptyHandover() throws Exception {
+ final Handover handover = new Handover();
+
+ Exception error = new Exception();
+ handover.reportError(error);
+
+ try {
+ handover.pollNext();
+ fail("should throw an exception");
+ }
+ catch (Exception e) {
+ assertEquals(error, e);
+ }
+ }
+
+ @Test
+ public void testPublishErrorOnFullHandover() throws Exception {
+ final Handover handover = new Handover();
+ handover.produce(createTestRecords());
+
+ IOException error = new IOException();
+ handover.reportError(error);
+
+ try {
+ handover.pollNext();
+ fail("should throw an exception");
+ }
+ catch (Exception e) {
+ assertEquals(error, e);
+ }
+ }
+
+ @Test
+ public void testExceptionMarksClosedOnEmpty() throws Exception {
+ final Handover handover = new Handover();
+
+ IllegalStateException error = new IllegalStateException();
+ handover.reportError(error);
+
+ try {
+ handover.produce(createTestRecords());
+ fail("should throw an exception");
+ }
+ catch (Handover.ClosedException e) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testExceptionMarksClosedOnFull() throws Exception {
+ final Handover handover = new Handover();
+ handover.produce(createTestRecords());
+
+ LinkageError error = new LinkageError();
+ handover.reportError(error);
+
+ try {
+ handover.produce(createTestRecords());
+ fail("should throw an exception");
+ }
+ catch (Handover.ClosedException e) {
+ // expected
+ }
+ }
+
+ //
------------------------------------------------------------------------
+ // test closing behavior
+ //
------------------------------------------------------------------------
+
+ @Test
+ public void testCloseEmptyForConsumer() throws Exception {
+ final Handover handover = new Handover();
+ handover.close();
+
+ try {
+ handover.pollNext();
+ fail("should throw an exception");
+ }
+ catch (Handover.ClosedException e) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testCloseFullForConsumer() throws Exception {
+ final Handover handover = new Handover();
+ handover.produce(createTestRecords());
+ handover.close();
+
+ try {
+ handover.pollNext();
+ fail("should throw an exception");
+ }
+ catch (Handover.ClosedException e) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testCloseEmptyForProducer() throws Exception {
+ final Handover handover = new Handover();
+ handover.close();
+
+ try {
+ handover.produce(createTestRecords());
+ fail("should throw an exception");
+ }
+ catch (Handover.ClosedException e) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testCloseFullForProducer() throws Exception {
+ final Handover handover = new Handover();
+ handover.produce(createTestRecords());
+ handover.close();
+
+ try {
+ handover.produce(createTestRecords());
+ fail("should throw an exception");
+ }
+ catch (Handover.ClosedException e) {
+ // expected
+ }
+ }
+
+ //
------------------------------------------------------------------------
+ // test wake up behavior
+ //
------------------------------------------------------------------------
+
+ @Test
+ public void testWakeupDoesNotWakeWhenEmpty() throws Exception {
+ Handover handover = new Handover();
+ handover.wakeupProducer();
+
+ // produce into a woken but empty handover
+ try {
+ handover.produce(createTestRecords());
+ }
+ catch (Handover.WakeupException e) {
+ fail();
+ }
+
+ // handover now has records, next time we wakeup and produce it
needs
+ // to throw an exception
+ handover.wakeupProducer();
+ try {
+ handover.produce(createTestRecords());
+ fail("should throw an exception");
+ }
+ catch (Handover.WakeupException e) {
+ // expected
+ }
+
+ // empty the handover
+ assertNotNull(handover.pollNext());
+
+ // producing into an empty handover should work
+ try {
+ handover.produce(createTestRecords());
+ }
+ catch (Handover.WakeupException e) {
+ fail();
+ }
+ }
+
+ @Test
+ public void testWakeupWakesOnlyOnce() throws Exception {
+ // create a full handover
+ final Handover handover = new Handover();
+ handover.produce(createTestRecords());
+
+ handover.wakeupProducer();
+
+ try {
+ handover.produce(createTestRecords());
+ fail();
+ } catch (WakeupException e) {
+ // expected
+ }
+
+ CheckedThread producer = new CheckedThread() {
+ @Override
+ public void go() throws Exception {
+ handover.produce(createTestRecords());
+ }
+ };
+ producer.start();
+
+ // the producer must go blocking
+ producer.waitUntilThreadHoldsLock(10000);
+
+ // release the thread by consuming something
+ assertNotNull(handover.pollNext());
+ producer.sync();
+ }
+
+ //
------------------------------------------------------------------------
+ // utilities
+ //
------------------------------------------------------------------------
+
+ @SuppressWarnings("unchecked")
+ static ConsumerRecords<byte[], byte[]> createTestRecords() {
--- End diff --
Might as well make this private.
> Kafka Consumer (0.9/0.10) threading model leads problematic cancellation
> behavior
> ---------------------------------------------------------------------------------
>
> Key: FLINK-5048
> URL: https://issues.apache.org/jira/browse/FLINK-5048
> Project: Flink
> Issue Type: Bug
> Components: Kafka Connector
> Affects Versions: 1.1.3
> Reporter: Stephan Ewen
> Assignee: Stephan Ewen
> Fix For: 1.2.0
>
>
> The {{FLinkKafkaConsumer}} (0.9 / 0.10) spawns a separate thread that
> operates the KafkaConsumer. That thread is shielded from interrupts, because
> the Kafka Consumer has not been handling thread interrupts well.
> Since that thread is also the thread that emits records, it may block in the
> network stack (backpressure) or in chained operators. The later case leads to
> situations where cancellations get very slow unless that thread would be
> interrupted (which it cannot be).
> I propose to change the thread model as follows:
> - A spawned consumer thread pull from the KafkaConsumer and pushes its
> pulled batch of records into a blocking queue (size one)
> - The main thread of the task will pull the record batches from the
> blocking queue and emit the records.
> This allows actually for some additional I/O overlay while limiting the
> additional memory consumption - only two batches are ever held, one being
> fetched and one being emitted.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)