[ 
https://issues.apache.org/jira/browse/FLINK-5969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15992802#comment-15992802
 ] 

ASF GitHub Bot commented on FLINK-5969:
---------------------------------------

Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3778#discussion_r114298601
  
    --- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom12MigrationTest.java
 ---
    @@ -0,0 +1,491 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.kafka;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertNull;
    +import static org.junit.Assert.assertTrue;
    +import static org.mockito.Matchers.anyMapOf;
    +import static org.mockito.Mockito.doAnswer;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.when;
    +
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import org.apache.flink.core.testutils.OneShotLatch;
    +import org.apache.flink.streaming.api.TimeCharacteristic;
    +import 
org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
    +import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
    +import org.apache.flink.streaming.api.functions.source.SourceFunction;
    +import org.apache.flink.streaming.api.operators.StreamSource;
    +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
    +import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
    +import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
    +import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
    +import org.apache.flink.streaming.util.OperatorSnapshotUtil;
    +import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
    +import org.apache.flink.util.SerializedValue;
    +import org.junit.Assert;
    +import org.junit.Ignore;
    +import org.junit.Test;
    +import org.mockito.invocation.InvocationOnMock;
    +import org.mockito.stubbing.Answer;
    +
    +/**
    + * Tests for checking whether {@link FlinkKafkaConsumerBase} can restore 
from snapshots that were
    + * done using the Flink 1.2 {@link FlinkKafkaConsumerBase}.
    + *
    + * <p>For regenerating the binary snapshot files run {@link 
#writeSnapshot()} on the Flink 1.2
    + * branch.
    + */
    +public class FlinkKafkaConsumerBaseFrom12MigrationTest {
    +
    +
    +   /**
    +    * Manually run this to write binary snapshot data.
    +    */
    +   @Ignore
    +   @Test
    +   public void writeSnapshot() throws Exception {
    +           final HashMap<KafkaTopicPartition, Long> state = new 
HashMap<>();
    +           state.put(new KafkaTopicPartition("abc", 13), 16768L);
    +           state.put(new KafkaTopicPartition("def", 7), 987654321L);
    +           
writeSnapshot("src/test/resources/kafka-consumer-migration-test-flink1.2-snapshot",
 state);
    +
    +           final HashMap<KafkaTopicPartition, Long> emptyState = new 
HashMap<>();
    +           
writeSnapshot("src/test/resources/kafka-consumer-migration-test-flink1.2-snapshot-empty-state",
 emptyState);
    +   }
    +
    +   private void writeSnapshot(String path, HashMap<KafkaTopicPartition, 
Long> state) throws Exception {
    +
    +           final OneShotLatch latch = new OneShotLatch();
    +           final AbstractFetcher<String, ?> fetcher = 
mock(AbstractFetcher.class);
    +
    +           doAnswer(new Answer<Void>() {
    +                   @Override
    +                   public Void answer(InvocationOnMock invocation) throws 
Throwable {
    +                           latch.trigger();
    +                           return null;
    +                   }
    +           }).when(fetcher).runFetchLoop();
    +
    +           when(fetcher.snapshotCurrentState()).thenReturn(state);
    +
    +           final List<KafkaTopicPartition> partitions = new ArrayList<>();
    +           partitions.add(new KafkaTopicPartition("abc", 13));
    +           partitions.add(new KafkaTopicPartition("def", 7));
    +
    +           final DummyFlinkKafkaConsumer<String> consumerFunction = new 
DummyFlinkKafkaConsumer<>(
    +                           new FetcherFactory<String>() {
    +                                   private static final long 
serialVersionUID = -2803131905656983619L;
    +
    +                                   @Override
    +                                   public AbstractFetcher<String, ?> 
createFetcher() {
    +                                           return fetcher;
    +                                   }
    +                           },
    +                           partitions);
    +
    +           StreamSource<String, DummyFlinkKafkaConsumer<String>> 
consumerOperator =
    +                           new StreamSource<>(consumerFunction);
    +
    +
    +           final AbstractStreamOperatorTestHarness<String> testHarness =
    +                           new 
AbstractStreamOperatorTestHarness<>(consumerOperator, 1, 1, 0);
    +
    +           
testHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
    +
    +           testHarness.setup();
    +           testHarness.open();
    +
    +           final Throwable[] error = new Throwable[1];
    +
    +           // run the source asynchronously
    +           Thread runner = new Thread() {
    +                   @Override
    +                   public void run() {
    +                           try {
    +                                   consumerFunction.run(new 
DummySourceContext() {
    +                                           @Override
    +                                           public void collect(String 
element) {
    +                                                   latch.trigger();
    +                                           }
    +                                   });
    +                           }
    +                           catch (Throwable t) {
    +                                   t.printStackTrace();
    +                                   error[0] = t;
    +                           }
    +                   }
    +           };
    +           runner.start();
    +
    +           if (!latch.isTriggered()) {
    +                   latch.await();
    +           }
    +
    +           final OperatorStateHandles snapshot;
    +           synchronized (testHarness.getCheckpointLock()) {
    +                   snapshot = testHarness.snapshot(0L, 0L);
    +           }
    +
    +           OperatorSnapshotUtil.writeStateHandle(snapshot, path);
    +
    +           consumerOperator.close();
    +           runner.join();
    +   }
    +
    +   @Test
    +   public void testRestoreWithEmptyStateNoPartitions() throws Exception {
    +           // 
--------------------------------------------------------------------
    +           //   prepare fake states
    +           // 
--------------------------------------------------------------------
    +
    +           final OneShotLatch latch = new OneShotLatch();
    +           final AbstractFetcher<String, ?> fetcher = 
mock(AbstractFetcher.class);
    +
    +           doAnswer(new Answer() {
    +                   @Override
    +                   public Object answer(InvocationOnMock invocationOnMock) 
throws Throwable {
    +                           latch.trigger();
    +                           Assert.fail("This should never be called");
    +                           return null;
    +                   }
    +           
}).when(fetcher).restoreOffsets(anyMapOf(KafkaTopicPartition.class, 
Long.class));
    +
    +           doAnswer(new Answer<Void>() {
    +                   @Override
    +                   public Void answer(InvocationOnMock invocation) throws 
Throwable {
    +                           latch.trigger();
    +                           Assert.fail("This should never be called");
    +                           return null;
    +                   }
    +           }).when(fetcher).runFetchLoop();
    +
    +           final DummyFlinkKafkaConsumer<String> consumerFunction = new 
DummyFlinkKafkaConsumer<>(
    +                           new FetcherFactory<String>() {
    +                                   private static final long 
serialVersionUID = -2803131905656983619L;
    +
    +                                   @Override
    +                                   public AbstractFetcher<String, ?> 
createFetcher() {
    +                                           return fetcher;
    +                                   }
    +                           },
    +                           Collections.<KafkaTopicPartition>emptyList());
    +
    +           StreamSource<String, DummyFlinkKafkaConsumer<String>> 
consumerOperator =
    +                   new StreamSource<>(consumerFunction);
    +
    +           final AbstractStreamOperatorTestHarness<String> testHarness =
    +                   new 
AbstractStreamOperatorTestHarness<>(consumerOperator, 1, 1, 0);
    +
    +           
testHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
    +
    +           testHarness.setup();
    +           testHarness.initializeState(
    +                           OperatorSnapshotUtil.readStateHandle(
    +                                           
OperatorSnapshotUtil.getResourceFilename("kafka-consumer-migration-test-flink1.2-snapshot-empty-state")));
    +           testHarness.open();
    +
    +           final Throwable[] error = new Throwable[1];
    +
    +           // run the source asynchronously
    +           Thread runner = new Thread() {
    +                   @Override
    +                   public void run() {
    +                           try {
    +                                   consumerFunction.run(new 
DummySourceContext() {
    +                                           @Override
    +                                           public void collect(String 
element) {
    +                                                   latch.trigger();
    +                                                   Assert.fail("This 
should never be called.");
    +                                           }
    +
    +                                           @Override
    +                                           public void 
emitWatermark(Watermark mark) {
    +                                                   latch.trigger();
    +                                                   
assertEquals(Long.MAX_VALUE, mark.getTimestamp());
    +                                           }
    +                                   });
    +                           }
    +                           catch (Throwable t) {
    +                                   t.printStackTrace();
    +                                   error[0] = t;
    +                           }
    +                   }
    +           };
    +           runner.start();
    +
    +           if (!latch.isTriggered()) {
    +                   latch.await();
    +           }
    +
    +           consumerOperator.cancel();
    +           consumerOperator.close();
    +
    +           runner.interrupt();
    +           runner.join();
    +
    +           assertNull(error[0]);
    +   }
    +
    +   @Test
    +   public void testRestoreWithEmptyStateWithPartitions() throws Exception {
    +           final OneShotLatch latch = new OneShotLatch();
    +           final AbstractFetcher<String, ?> fetcher = 
mock(AbstractFetcher.class);
    +
    +           doAnswer(new Answer() {
    +                   @Override
    +                   public Object answer(InvocationOnMock invocationOnMock) 
throws Throwable {
    +                           latch.trigger();
    +                           Assert.fail("This should never be called");
    +                           return null;
    +                   }
    +           
}).when(fetcher).restoreOffsets(anyMapOf(KafkaTopicPartition.class, 
Long.class));
    +
    +           doAnswer(new Answer<Void>() {
    +                   @Override
    +                   public Void answer(InvocationOnMock invocation) throws 
Throwable {
    +                           latch.trigger();
    +                           return null;
    +                   }
    +           }).when(fetcher).runFetchLoop();
    +
    +           final List<KafkaTopicPartition> partitions = new ArrayList<>();
    +           partitions.add(new KafkaTopicPartition("abc", 13));
    +           partitions.add(new KafkaTopicPartition("def", 7));
    +
    +           final DummyFlinkKafkaConsumer<String> consumerFunction = new 
DummyFlinkKafkaConsumer<>(
    +                           new FetcherFactory<String>() {
    +                                   private static final long 
serialVersionUID = -2803131905656983619L;
    +
    +                                   @Override
    +                                   public AbstractFetcher<String, ?> 
createFetcher() {
    +                                           return fetcher;
    +                                   }
    +                           },
    +                           partitions);
    +
    +           StreamSource<String, DummyFlinkKafkaConsumer<String>> 
consumerOperator =
    +                   new StreamSource<>(consumerFunction);
    +
    +           final AbstractStreamOperatorTestHarness<String> testHarness =
    +                   new 
AbstractStreamOperatorTestHarness<>(consumerOperator, 1, 1, 0);
    +
    +           
testHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
    +
    +           testHarness.setup();
    +           testHarness.initializeState(
    +                           OperatorSnapshotUtil.readStateHandle(
    +                                           
OperatorSnapshotUtil.getResourceFilename("kafka-consumer-migration-test-flink1.2-snapshot-empty-state")));
    +           testHarness.open();
    +
    +           final Throwable[] error = new Throwable[1];
    +
    +           // run the source asynchronously
    +           Thread runner = new Thread() {
    +                   @Override
    +                   public void run() {
    +                           try {
    +                                   consumerFunction.run(new 
DummySourceContext() {
    +                                           @Override
    +                                           public void collect(String 
element) {
    +                                                   latch.trigger();
    +                                                   Assert.fail("This 
should never be called.");
    +                                           }
    +
    +                                           @Override
    +                                           public void 
emitWatermark(Watermark mark) {
    +                                                   latch.trigger();
    +                                                   
assertEquals(Long.MAX_VALUE, mark.getTimestamp());
    +                                           }
    +                                   });
    +                           }
    +                           catch (Throwable t) {
    +                                   t.printStackTrace();
    +                                   error[0] = t;
    +                           }
    +                   }
    +           };
    +           runner.start();
    +
    +           if (!latch.isTriggered()) {
    +                   latch.await();
    +           }
    +
    +           consumerOperator.close();
    +           runner.interrupt();
    +           runner.join();
    +
    +           assertNull(error[0]);
    --- End diff --
    
    if it isn't null we should print the error.


> Add savepoint backwards compatibility tests from 1.2 to 1.3
> -----------------------------------------------------------
>
>                 Key: FLINK-5969
>                 URL: https://issues.apache.org/jira/browse/FLINK-5969
>             Project: Flink
>          Issue Type: Improvement
>          Components: Tests
>    Affects Versions: 1.3.0
>            Reporter: Aljoscha Krettek
>            Assignee: Aljoscha Krettek
>            Priority: Blocker
>             Fix For: 1.3.0
>
>
> We currently only have tests that test migration from 1.1 to 1.3, because we 
> added these tests when releasing Flink 1.2.
> We have to copy/migrate those tests:
>  - {{StatefulUDFSavepointMigrationITCase}}
>  - {{*MigrationTest}}
>  - {{AbstractKeyedCEPPatternOperator}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to