azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r290387680
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
##########
@@ -19,47 +19,107 @@
package org.apache.flink.runtime.deployment;
import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.network.ConnectionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
+import org.apache.flink.runtime.shuffle.PartitionDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.runtime.shuffle.UnknownShuffleDescriptor;
import org.junit.Test;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
/**
* Tests for the {@link ResultPartitionDeploymentDescriptor}.
*/
public class ResultPartitionDeploymentDescriptorTest {
+ private static final IntermediateDataSetID resultId = new
IntermediateDataSetID();
+
+ private static final IntermediateResultPartitionID partitionId = new
IntermediateResultPartitionID();
+ private static final ExecutionAttemptID producerExecutionId = new
ExecutionAttemptID();
+
+ private static final ResultPartitionType partitionType =
ResultPartitionType.PIPELINED;
+ private static final int numberOfSubpartitions = 24;
+ private static final int connectionIndex = 10;
+
+ private static final PartitionDescriptor partitionDescriptor = new
PartitionDescriptor(
+ resultId,
+ partitionId,
+ partitionType,
+ numberOfSubpartitions,
+ connectionIndex);
+
+ private static final ResultPartitionID resultPartitionID = new
ResultPartitionID(partitionId, producerExecutionId);
+
+ private static final ResourceID producerLocation = new
ResourceID("producerLocation");
+ private static final InetSocketAddress address = new
InetSocketAddress("localhost", 10000);
+ private static final ConnectionID connectionID = new
ConnectionID(address, connectionIndex);
+
+ /**
+ * Tests simple de/serialization with {@link UnknownShuffleDescriptor}.
+ */
+ @Test
+ public void testSerializationWithUnknownShuffleDescriptor() throws
Exception {
+ ShuffleDescriptor shuffleDescriptor = new
UnknownShuffleDescriptor(resultPartitionID);
+
+ ResultPartitionDeploymentDescriptor copy =
+
createCopyAndVerifyResultPartitionDeploymentDescriptor(shuffleDescriptor);
+
+ assertThat(copy.getShuffleDescriptor(),
instanceOf(UnknownShuffleDescriptor.class));
+ UnknownShuffleDescriptor copySdd = (UnknownShuffleDescriptor)
copy.getShuffleDescriptor();
Review comment:
leftover, will remove
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services