[ 
https://issues.apache.org/jira/browse/FLINK-10942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16695672#comment-16695672
 ] 

ASF GitHub Bot commented on FLINK-10942:
----------------------------------------

pnowojski commented on a change in pull request #7146: 
[FLINK-10942][network,test] Deduplicate common codes in OutputEmitterTest
URL: https://github.com/apache/flink/pull/7146#discussion_r235635663
 
 

 ##########
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/OutputEmitterTest.java
 ##########
 @@ -48,404 +49,272 @@
 import java.io.PipedOutputStream;
 
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
+@SuppressWarnings({"unchecked", "rawtypes"})
 public class OutputEmitterTest {
-       
-       
+
        @Test
        public void testPartitionHash() {
                // Test for IntValue
-               @SuppressWarnings({"unchecked", "rawtypes"})
-               final TypeComparator<Record> intComp = new 
RecordComparatorFactory(new int[] {0}, new Class[] 
{IntValue.class}).createComparator();
-               final ChannelSelector<SerializationDelegate<Record>> oe1 = new 
OutputEmitter<Record>(ShipStrategyType.PARTITION_HASH, intComp);
-               final SerializationDelegate<Record> delegate = new 
SerializationDelegate<Record>(new RecordSerializerFactory().getSerializer());
-               
-               int numChans = 100;
-               int numRecs = 50000;
-               int[] hit = new int[numChans];
-
-               for (int i = 0; i < numRecs; i++) {
-                       IntValue k = new IntValue(i);
-                       Record rec = new Record(k);
-                       
-                       delegate.setInstance(rec);
-                       
-                       int[] chans = oe1.selectChannels(delegate, hit.length);
-                       for (int chan : chans) {
-                               hit[chan]++;
-                       }
-               }
-
-               int cnt = 0;
-               for (int aHit : hit) {
-                       assertTrue(aHit > 0);
-                       cnt += aHit;
-               }
-               assertTrue(cnt == numRecs);
-
+               verifyPartitionHashSelectedChannels(50000, 100, new 
Either.Left<>(0));
                // Test for StringValue
-               @SuppressWarnings({"unchecked", "rawtypes"})
-               final TypeComparator<Record> stringComp = new 
RecordComparatorFactory(new int[] {0}, new Class[] 
{StringValue.class}).createComparator();
-               final ChannelSelector<SerializationDelegate<Record>> oe2 = new 
OutputEmitter<Record>(ShipStrategyType.PARTITION_HASH, stringComp);
-
-               numChans = 100;
-               numRecs = 10000;
-               
-               hit = new int[numChans];
-
-               for (int i = 0; i < numRecs; i++) {
-                       StringValue k = new StringValue(i + "");
-                       Record rec = new Record(k);
-                       delegate.setInstance(rec);
-                               
-                       int[] chans = oe2.selectChannels(delegate, hit.length);
-                       for (int chan : chans) {
-                               hit[chan]++;
-                       }
-               }
+               verifyPartitionHashSelectedChannels(10000, 100, new 
Either.Right<>(""));
 
-               cnt = 0;
-               for (int aHit : hit) {
-                       assertTrue(aHit > 0);
-                       cnt += aHit;
-               }
-               assertTrue(cnt == numRecs);
-               
-               // test hash corner cases
+               // Test hash corner cases
                final TestIntComparator testIntComp = new TestIntComparator();
-               final ChannelSelector<SerializationDelegate<Integer>> oe3 = new 
OutputEmitter<Integer>(ShipStrategyType.PARTITION_HASH, testIntComp);
-               final SerializationDelegate<Integer> intDel = new 
SerializationDelegate<Integer>(new IntSerializer());
-               
-               numChans = 100;
-               
+               final ChannelSelector<SerializationDelegate<Integer>> selector 
= new OutputEmitter<>(
+                       ShipStrategyType.PARTITION_HASH, testIntComp);
+               final SerializationDelegate<Integer> serializationDelegate = 
new SerializationDelegate<>(new IntSerializer());
+
                // MinVal hash
 
 Review comment:
   nit: I would drop `// MinVal hash`, `// -1 hash`, `// ... hash` comments 
here. They don't explain anything more then the 
`assertPartitionHashSelectedChannels` method name already does.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Deduplicate common codes in OutputEmitterTest
> ---------------------------------------------
>
>                 Key: FLINK-10942
>                 URL: https://issues.apache.org/jira/browse/FLINK-10942
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Network, Tests
>            Reporter: zhijiang
>            Assignee: zhijiang
>            Priority: Minor
>              Labels: pull-request-available
>             Fix For: 1.7.1
>
>
> There are many duplicated codes in {{OutputEmitterTest}} to make it difficult 
> to maintain. So it is necessary to abstract the common codes to make it 
> simple which brings benefits for the following refactor work in 
> {{ChannelSelector}} interface.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to