zentol commented on a change in pull request #11443: [FLINK-14791][coordination] ResourceManager tracks ClusterPartitions URL: https://github.com/apache/flink/pull/11443#discussion_r400035559
########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResourceManagerPartitionTrackerImplTest.java ########## @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.taskexecutor.partition.ClusterPartitionReport; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.hasKey; +import static org.hamcrest.collection.IsEmptyCollection.empty; +import static org.junit.Assert.assertEquals; + +/** + * Test for the {@link ResourceManagerPartitionTrackerImpl}. + */ +public class ResourceManagerPartitionTrackerImplTest extends TestLogger { + + private static final ClusterPartitionReport EMPTY_PARTITION_REPORT = new ClusterPartitionReport(Collections.emptySet()); + + private static final ResourceID TASK_EXECUTOR_ID_1 = ResourceID.generate(); + private static final ResourceID TASK_EXECUTOR_ID_2 = ResourceID.generate(); + private static final IntermediateDataSetID DATA_SET_ID = new IntermediateDataSetID(); + private static final ResultPartitionID PARTITION_ID_1 = new ResultPartitionID(); + private static final ResultPartitionID PARTITION_ID_2 = new ResultPartitionID(); + + @Test + public void testProcessEmptyClusterPartitionReport() { + TestClusterPartitionReleaser partitionReleaser = new TestClusterPartitionReleaser(); + final ResourceManagerPartitionTracker tracker = new ResourceManagerPartitionTrackerImpl(partitionReleaser); + + tracker.processTaskExecutorClusterPartitionReport(TASK_EXECUTOR_ID_1, EMPTY_PARTITION_REPORT); + assertThat(partitionReleaser.releaseCalls, empty()); + } + + /** + * Verifies that a task executor hosting multiple partitions of a data set receives a release call if a subset of + * its partitions is lost. + */ + @Test + public void testReportProcessingWithPartitionLossOnSameTaskExecutor() { + TestClusterPartitionReleaser partitionReleaser = new TestClusterPartitionReleaser(); + final ResourceManagerPartitionTracker tracker = new ResourceManagerPartitionTrackerImpl(partitionReleaser); + + tracker.processTaskExecutorClusterPartitionReport( + TASK_EXECUTOR_ID_1, + createClusterPartitionReport(DATA_SET_ID, 2, PARTITION_ID_1, PARTITION_ID_2)); + + tracker.processTaskExecutorClusterPartitionReport( + TASK_EXECUTOR_ID_1, + createClusterPartitionReport(DATA_SET_ID, 2, PARTITION_ID_2)); + + assertThat(partitionReleaser.releaseCalls, contains(Tuple2.of(TASK_EXECUTOR_ID_1, Collections.singleton(DATA_SET_ID)))); + } + + /** + * Verifies that a task executor hosting partitions of a data set receives a release call if a partition of the + * data set is lost on another task executor. + */ + @Test + public void testReportProcessingWithPartitionLossOnOtherTaskExecutor() { + TestClusterPartitionReleaser partitionReleaser = new TestClusterPartitionReleaser(); + final ResourceManagerPartitionTracker tracker = new ResourceManagerPartitionTrackerImpl(partitionReleaser); + + tracker.processTaskExecutorClusterPartitionReport( + TASK_EXECUTOR_ID_1, + createClusterPartitionReport(DATA_SET_ID, 2, PARTITION_ID_1)); + + tracker.processTaskExecutorClusterPartitionReport( + TASK_EXECUTOR_ID_2, + createClusterPartitionReport(DATA_SET_ID, 2, PARTITION_ID_2)); + + tracker.processTaskExecutorClusterPartitionReport(TASK_EXECUTOR_ID_1, EMPTY_PARTITION_REPORT); + + assertThat(partitionReleaser.releaseCalls, contains(Tuple2.of(TASK_EXECUTOR_ID_2, Collections.singleton(DATA_SET_ID)))); + } + + @Test + public void testListDataSetsBasics() { + TestClusterPartitionReleaser partitionReleaser = new TestClusterPartitionReleaser(); + final ResourceManagerPartitionTracker tracker = new ResourceManagerPartitionTrackerImpl(partitionReleaser); + + assertEquals(0, tracker.listDataSets().size()); + + tracker.processTaskExecutorClusterPartitionReport( + TASK_EXECUTOR_ID_1, + createClusterPartitionReport(DATA_SET_ID, 1, PARTITION_ID_1)); + + final Map<IntermediateDataSetID, DataSetMetaInfo> listing = tracker.listDataSets(); + assertThat(listing, hasKey(DATA_SET_ID)); + DataSetMetaInfo metaInfo = listing.get(DATA_SET_ID); + assertEquals(1, metaInfo.getNumTotalPartitions()); + + tracker.processTaskExecutorClusterPartitionReport( + TASK_EXECUTOR_ID_1, + EMPTY_PARTITION_REPORT); + assertEquals(0, tracker.listDataSets().size()); + } + + @Test + public void testListDataSetsMultiplePartitionsOnSingleTaskExecutor() { + TestClusterPartitionReleaser partitionReleaser = new TestClusterPartitionReleaser(); + final ResourceManagerPartitionTracker tracker = new ResourceManagerPartitionTrackerImpl(partitionReleaser); + + // data set consists of 2 partitions but only 1 is being tracked -> incomplete and should not be listed (yet) + tracker.processTaskExecutorClusterPartitionReport( + TASK_EXECUTOR_ID_1, + createClusterPartitionReport(DATA_SET_ID, 2, PARTITION_ID_1)); + assertEquals(0, tracker.listDataSets().size()); + + // start tracking another partitions, but we lost partition 1 so the data set is still incomplete + tracker.processTaskExecutorClusterPartitionReport( + TASK_EXECUTOR_ID_1, + createClusterPartitionReport(DATA_SET_ID, 2, PARTITION_ID_2)); + assertEquals(0, tracker.listDataSets().size()); + + // dataset is considered complete since all partitions are being tracked + tracker.processTaskExecutorClusterPartitionReport( + TASK_EXECUTOR_ID_1, + createClusterPartitionReport(DATA_SET_ID, 2, PARTITION_ID_1, PARTITION_ID_2)); + final Map<IntermediateDataSetID, DataSetMetaInfo> listing = tracker.listDataSets(); + assertThat(listing, hasKey(DATA_SET_ID)); + + // dataset is no longer considered complete since partition 2 was lost + tracker.processTaskExecutorClusterPartitionReport( + TASK_EXECUTOR_ID_1, + createClusterPartitionReport(DATA_SET_ID, 2, PARTITION_ID_1)); + assertEquals(0, tracker.listDataSets().size()); + } + + @Test + public void testListDataSetsMultiplePartitionsAcrossTaskExecutors() { + TestClusterPartitionReleaser partitionReleaser = new TestClusterPartitionReleaser(); + final ResourceManagerPartitionTracker tracker = new ResourceManagerPartitionTrackerImpl(partitionReleaser); + + tracker.processTaskExecutorClusterPartitionReport( + TASK_EXECUTOR_ID_1, + createClusterPartitionReport(DATA_SET_ID, 2, PARTITION_ID_1)); + tracker.processTaskExecutorClusterPartitionReport( + TASK_EXECUTOR_ID_2, + createClusterPartitionReport(DATA_SET_ID, 2, PARTITION_ID_2)); + final Map<IntermediateDataSetID, DataSetMetaInfo> listing = tracker.listDataSets(); + assertThat(listing, hasKey(DATA_SET_ID)); + + tracker.processTaskExecutorClusterPartitionReport( + TASK_EXECUTOR_ID_1, + EMPTY_PARTITION_REPORT); + assertEquals(0, tracker.listDataSets().size()); + } + + @Test + public void testReleasePartition() { + TestClusterPartitionReleaser partitionReleaser = new TestClusterPartitionReleaser(); + final ResourceManagerPartitionTracker tracker = new ResourceManagerPartitionTrackerImpl(partitionReleaser); + + tracker.processTaskExecutorClusterPartitionReport( + TASK_EXECUTOR_ID_1, + createClusterPartitionReport(DATA_SET_ID, 2, PARTITION_ID_1)); + + tracker.processTaskExecutorClusterPartitionReport( + TASK_EXECUTOR_ID_2, + createClusterPartitionReport(DATA_SET_ID, 2, PARTITION_ID_2)); + + tracker.releaseClusterPartitions(DATA_SET_ID); + + assertThat(partitionReleaser.releaseCalls, containsInAnyOrder( + Tuple2.of(TASK_EXECUTOR_ID_1, Collections.singleton(DATA_SET_ID)), + Tuple2.of(TASK_EXECUTOR_ID_2, Collections.singleton(DATA_SET_ID)))); + + // the data set should still be tracked, since the partition release was not confirmed yet by the task executors + assertThat(tracker.listDataSets().keySet(), contains(DATA_SET_ID)); + } + + @Test + public void testShutdownProcessing() { + TestClusterPartitionReleaser partitionReleaser = new TestClusterPartitionReleaser(); + final ResourceManagerPartitionTracker tracker = new ResourceManagerPartitionTrackerImpl(partitionReleaser); + + tracker.processTaskExecutorShutdown(TASK_EXECUTOR_ID_1); + assertThat(partitionReleaser.releaseCalls, empty()); + + tracker.processTaskExecutorClusterPartitionReport( + TASK_EXECUTOR_ID_1, + createClusterPartitionReport(DATA_SET_ID, 3, PARTITION_ID_1, PARTITION_ID_2)); + + tracker.processTaskExecutorClusterPartitionReport( + TASK_EXECUTOR_ID_2, + createClusterPartitionReport(DATA_SET_ID, 3, new ResultPartitionID())); + + tracker.processTaskExecutorShutdown(TASK_EXECUTOR_ID_1); + + assertThat(partitionReleaser.releaseCalls, contains(Tuple2.of(TASK_EXECUTOR_ID_2, Collections.singleton(DATA_SET_ID)))); + } + + private static ClusterPartitionReport createClusterPartitionReport(IntermediateDataSetID dataSetId, int numTotalPartitions, ResultPartitionID... partitionId) { + return new ClusterPartitionReport(Collections.singletonList( + new ClusterPartitionReport.ClusterPartitionReportEntry( + dataSetId, + new HashSet<>(Arrays.asList(partitionId)), + numTotalPartitions))); + } + + private static class TestClusterPartitionReleaser implements ClusterPartitionReleaser { + + final List<Tuple2<ResourceID, Set<IntermediateDataSetID>>> releaseCalls = new ArrayList<>(); Review comment: it is a field that is purposefully accessed from the outside and hence shouldn't be private for clarity reasons. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
