Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5955#discussion_r188512949
  
    --- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java
 ---
    @@ -0,0 +1,418 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.checkpointing.utils;
    +
    +import org.apache.flink.api.common.functions.RichFlatMapFunction;
    +import org.apache.flink.api.common.restartstrategy.RestartStrategies;
    +import org.apache.flink.api.common.state.MapStateDescriptor;
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeHint;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
    +import org.apache.flink.runtime.state.StateBackendLoader;
    +import org.apache.flink.runtime.state.memory.MemoryStateBackend;
    +import org.apache.flink.streaming.api.TimeCharacteristic;
    +import org.apache.flink.streaming.api.datastream.BroadcastStream;
    +import org.apache.flink.streaming.api.datastream.KeyedStream;
    +import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
    +import org.apache.flink.streaming.api.functions.source.SourceFunction;
    +import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
    +import org.apache.flink.streaming.util.migration.MigrationVersion;
    +import org.apache.flink.util.Collector;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +import org.junit.runner.RunWith;
    +import org.junit.runners.Parameterized;
    +
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +/**
    + * Migration ITCases for a stateful job with broadcast state. The tests 
are parameterized to (potentially)
    + * cover migrating for multiple previous Flink versions, as well as for 
different state backends.
    + */
    +@RunWith(Parameterized.class)
    +public class StatefulJobWBroadcastStateMigrationITCase extends 
SavepointMigrationTestBase {
    +
    +   private static final int NUM_SOURCE_ELEMENTS = 4;
    +
    +   // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints
    +   private final StatefulJobSavepointMigrationITCase.ExecutionMode 
executionMode =
    +                   
StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT;
    +
    +   @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
    +   public static Collection<Tuple2<MigrationVersion, String>> parameters 
() {
    +           return Arrays.asList(
    +                           Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
    +                           Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
    +   }
    +
    +   private final MigrationVersion testMigrateVersion;
    +   private final String testStateBackend;
    +
    +   public 
StatefulJobWBroadcastStateMigrationITCase(Tuple2<MigrationVersion, String> 
testMigrateVersionAndBackend) throws Exception {
    +           this.testMigrateVersion = testMigrateVersionAndBackend.f0;
    +           this.testStateBackend = testMigrateVersionAndBackend.f1;
    +   }
    +
    +   @Test
    +   public void testSavepoint() throws Exception {
    +
    +           final int parallelism = 4;
    +
    +           final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    +           env.setRestartStrategy(RestartStrategies.noRestart());
    +           env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    +
    +           switch (testStateBackend) {
    +                   case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME:
    +                           env.setStateBackend(new RocksDBStateBackend(new 
MemoryStateBackend()));
    +                           break;
    +                   case StateBackendLoader.MEMORY_STATE_BACKEND_NAME:
    +                           env.setStateBackend(new MemoryStateBackend());
    +                           break;
    +                   default:
    +                           throw new UnsupportedOperationException();
    +           }
    +
    +           env.enableCheckpointing(500);
    +           env.setParallelism(parallelism);
    +           env.setMaxParallelism(parallelism);
    +
    +           SourceFunction<Tuple2<Long, Long>> nonParallelSource;
    +           SourceFunction<Tuple2<Long, Long>> nonParallelSourceB;
    +           SourceFunction<Tuple2<Long, Long>> parallelSource;
    +           SourceFunction<Tuple2<Long, Long>> parallelSourceB;
    +           RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> 
flatMap;
    +           OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>> 
timelyOperator;
    +           KeyedBroadcastProcessFunction<Long, Tuple2<Long, Long>, 
Tuple2<Long, Long>, Tuple2<Long, Long>> firstBroadcastFunction;
    +           KeyedBroadcastProcessFunction<Long, Tuple2<Long, Long>, 
Tuple2<Long, Long>, Tuple2<Long, Long>> secondBroadcastFunction;
    +
    +           final Map<Long, Long> expectedFirstState = new HashMap<>();
    +           expectedFirstState.put(0L, 0L);
    +           expectedFirstState.put(1L, 1L);
    +           expectedFirstState.put(2L, 2L);
    +           expectedFirstState.put(3L, 3L);
    +
    +           final Map<String, String> expectedSecondState = new HashMap<>();
    +           expectedSecondState.put("0", "0");
    +           expectedSecondState.put("1", "1");
    +           expectedSecondState.put("2", "2");
    +           expectedSecondState.put("3", "3");
    +
    +           final Map<String, String> expectedThirdState = new HashMap<>();
    +           expectedThirdState.put("0", "0");
    +           expectedThirdState.put("1", "1");
    +           expectedThirdState.put("2", "2");
    +           expectedThirdState.put("3", "3");
    +
    +           if (executionMode == 
StatefulJobSavepointMigrationITCase.ExecutionMode.PERFORM_SAVEPOINT) {
    +                   nonParallelSource = new 
MigrationTestUtils.CheckpointingNonParallelSourceWithListState(NUM_SOURCE_ELEMENTS);
    +                   nonParallelSourceB = new 
MigrationTestUtils.CheckpointingNonParallelSourceWithListState(NUM_SOURCE_ELEMENTS);
    +                   parallelSource = new 
MigrationTestUtils.CheckpointingParallelSourceWithUnionListState(NUM_SOURCE_ELEMENTS);
    +                   parallelSourceB = new 
MigrationTestUtils.CheckpointingParallelSourceWithUnionListState(NUM_SOURCE_ELEMENTS);
    +                   flatMap = new 
MigrationTestUtils.CheckpointingKeyedStateFlatMap();
    +                   timelyOperator = new 
MigrationTestUtils.CheckpointingTimelyStatefulOperator();
    +                   firstBroadcastFunction = new KeyedBroadcastFunction();
    +                   secondBroadcastFunction = new 
KeyedSingleBroadcastFunction();
    +           } else if (executionMode == 
StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT) {
    +                   nonParallelSource = new 
MigrationTestUtils.CheckingNonParallelSourceWithListState(NUM_SOURCE_ELEMENTS);
    +                   nonParallelSourceB = new 
MigrationTestUtils.CheckingNonParallelSourceWithListState(NUM_SOURCE_ELEMENTS);
    +                   parallelSource = new 
MigrationTestUtils.CheckingParallelSourceWithUnionListState(NUM_SOURCE_ELEMENTS);
    +                   parallelSourceB = new 
MigrationTestUtils.CheckingParallelSourceWithUnionListState(NUM_SOURCE_ELEMENTS);
    +                   flatMap = new 
MigrationTestUtils.CheckingKeyedStateFlatMap();
    +                   timelyOperator = new 
MigrationTestUtils.CheckingTimelyStatefulOperator();
    +                   firstBroadcastFunction = new 
CheckingKeyedBroadcastFunction(expectedFirstState, expectedSecondState);
    --- End diff --
    
    Actually the only state this test should cover is the broadcast ones, no?
    Why do we need to add all the others (apart from the source)?


---

Reply via email to