Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6040#discussion_r189031464
  
    --- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Flink9349Test.java
 ---
    @@ -0,0 +1,207 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kafka.internal;
    +
    +import org.apache.flink.api.common.serialization.SimpleStringSchema;
    +import org.apache.flink.core.testutils.MultiShotLatch;
    +import org.apache.flink.core.testutils.OneShotLatch;
    +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
    +import org.apache.flink.streaming.api.functions.source.SourceFunction;
    +import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
    +import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
    +import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
    +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
    +import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
    +import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
    +
    +import org.apache.kafka.clients.consumer.ConsumerRecords;
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.junit.Test;
    +import org.junit.runner.RunWith;
    +import org.mockito.invocation.InvocationOnMock;
    +import org.mockito.stubbing.Answer;
    +import org.powermock.core.classloader.annotations.PrepareForTest;
    +import org.powermock.modules.junit4.PowerMockRunner;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Properties;
    +import java.util.concurrent.CountDownLatch;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import static org.junit.Assert.assertFalse;
    +import static org.mockito.Mockito.anyLong;
    +import static org.powermock.api.mockito.PowerMockito.doAnswer;
    +import static org.powermock.api.mockito.PowerMockito.mock;
    +import static org.powermock.api.mockito.PowerMockito.when;
    +import static org.powermock.api.mockito.PowerMockito.whenNew;
    +
    +/**
    + * Unit tests for the {@link Flink9349Test}.
    + */
    +@RunWith(PowerMockRunner.class)
    +@PrepareForTest(KafkaConsumerThread.class)
    +public class Flink9349Test {
    +   @Test
    +   public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws 
Exception {
    +
    +           // test data
    +           final KafkaTopicPartition testPartition = new 
KafkaTopicPartition("test", 42);
    +           final Map<KafkaTopicPartition, Long> testCommitData = new 
HashMap<>();
    +           testCommitData.put(testPartition, 11L);
    +
    +           // to synchronize when the consumer is in its blocking method
    +           final OneShotLatch sync = new OneShotLatch();
    +
    +           // ----- the mock consumer with blocking poll calls ----
    +           final MultiShotLatch blockerLatch = new MultiShotLatch();
    +
    +           KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
    +           when(mockConsumer.poll(anyLong())).thenAnswer(new 
Answer<ConsumerRecords<?, ?>>() {
    +
    +                   @Override
    +                   public ConsumerRecords<?, ?> answer(InvocationOnMock 
invocation) throws InterruptedException {
    +                           sync.trigger();
    +                           blockerLatch.await();
    +                           return ConsumerRecords.empty();
    +                   }
    +           });
    +
    +           doAnswer(new Answer<Void>() {
    +                   @Override
    +                   public Void answer(InvocationOnMock invocation) {
    +                           blockerLatch.trigger();
    +                           return null;
    +                   }
    +           }).when(mockConsumer).wakeup();
    +
    +           // make sure the fetcher creates the mock consumer
    +           
whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
    +
    +           // ----- create the test fetcher -----
    +
    +           @SuppressWarnings("unchecked")
    +           SourceFunction.SourceContext<String> sourceContext = 
mock(SourceFunction.SourceContext.class);
    +           Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
    +                   Collections.singletonMap(new 
KafkaTopicPartition("test", 42), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
    +           KeyedDeserializationSchema<String> schema = new 
KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
    +
    +           final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
    +                   sourceContext,
    +                   partitionsWithInitialOffsets,
    +                   null, /* periodic watermark extractor */
    +                   null, /* punctuated watermark extractor */
    +                   new TestProcessingTimeService(),
    +                   10, /* watermark interval */
    +                   this.getClass().getClassLoader(),
    +                   "task_name",
    +                   schema,
    +                   new Properties(),
    +                   0L,
    +                   new UnregisteredMetricsGroup(),
    +                   new UnregisteredMetricsGroup(),
    +                   false);
    +
    +           // ----- run the fetcher -----
    +
    +           final AtomicReference<Throwable> error = new 
AtomicReference<>();
    +           int fetchTasks = 2;
    +           final CountDownLatch latch = new CountDownLatch(fetchTasks);
    +           ExecutorService service = 
Executors.newFixedThreadPool(fetchTasks + 1);
    +
    +           service.submit(new Thread("fetcher runner ") {
    +                   @Override
    +                   public void run() {
    +                           try {
    +                                   latch.await();
    +                                   fetcher.runFetchLoop();
    +                           } catch (Throwable t) {
    +                                   error.set(t);
    +                           }
    +                   }
    +           });
    +           for (int i = 0; i < fetchTasks; i++) {
    +                   service.submit(new Thread("add partitions " + i) {
    +
    +                           @Override
    +                           public void run() {
    +                                   try {
    +                                           List<KafkaTopicPartition> 
newPartitions = new ArrayList<>();
    +                                           for (int i = 0; i < 1000; i++) {
    +                                                   
newPartitions.add(testPartition);
    +                                           }
    +                                           
fetcher.addDiscoveredPartitions(newPartitions);
    +                                           latch.countDown();
    +                                           //latch.await();
    +                                           for (int i = 0; i < 100; i++) {
    +                                                   
fetcher.addDiscoveredPartitions(newPartitions);
    +                                                   Thread.sleep(1L);
    +                                           }
    +                                   } catch (Throwable t) {
    +                                           error.set(t);
    +                                   }
    +                           }
    +                   });
    +           }
    +
    +           service.awaitTermination(1L, TimeUnit.SECONDS);
    +
    +           // wait until the fetcher has reached the method of interest
    +           sync.await();
    +
    +           // ----- trigger the offset commit -----
    --- End diff --
    
    I don't think this is required for the scope of interest of this test, is 
it?


---

Reply via email to