[ 
https://issues.apache.org/jira/browse/FLINK-10942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16695673#comment-16695673
 ] 

ASF GitHub Bot commented on FLINK-10942:
----------------------------------------

pnowojski commented on a change in pull request #7146: 
[FLINK-10942][network,test] Deduplicate common codes in OutputEmitterTest
URL: https://github.com/apache/flink/pull/7146#discussion_r235636829
 
 

 ##########
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/OutputEmitterTest.java
 ##########
 @@ -48,404 +49,272 @@
 import java.io.PipedOutputStream;
 
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
+@SuppressWarnings({"unchecked", "rawtypes"})
 public class OutputEmitterTest {
-       
-       
+
        @Test
        public void testPartitionHash() {
                // Test for IntValue
-               @SuppressWarnings({"unchecked", "rawtypes"})
-               final TypeComparator<Record> intComp = new 
RecordComparatorFactory(new int[] {0}, new Class[] 
{IntValue.class}).createComparator();
-               final ChannelSelector<SerializationDelegate<Record>> oe1 = new 
OutputEmitter<Record>(ShipStrategyType.PARTITION_HASH, intComp);
-               final SerializationDelegate<Record> delegate = new 
SerializationDelegate<Record>(new RecordSerializerFactory().getSerializer());
-               
-               int numChans = 100;
-               int numRecs = 50000;
-               int[] hit = new int[numChans];
-
-               for (int i = 0; i < numRecs; i++) {
-                       IntValue k = new IntValue(i);
-                       Record rec = new Record(k);
-                       
-                       delegate.setInstance(rec);
-                       
-                       int[] chans = oe1.selectChannels(delegate, hit.length);
-                       for (int chan : chans) {
-                               hit[chan]++;
-                       }
-               }
-
-               int cnt = 0;
-               for (int aHit : hit) {
-                       assertTrue(aHit > 0);
-                       cnt += aHit;
-               }
-               assertTrue(cnt == numRecs);
-
+               verifyPartitionHashSelectedChannels(50000, 100, new 
Either.Left<>(0));
                // Test for StringValue
-               @SuppressWarnings({"unchecked", "rawtypes"})
-               final TypeComparator<Record> stringComp = new 
RecordComparatorFactory(new int[] {0}, new Class[] 
{StringValue.class}).createComparator();
-               final ChannelSelector<SerializationDelegate<Record>> oe2 = new 
OutputEmitter<Record>(ShipStrategyType.PARTITION_HASH, stringComp);
-
-               numChans = 100;
-               numRecs = 10000;
-               
-               hit = new int[numChans];
-
-               for (int i = 0; i < numRecs; i++) {
-                       StringValue k = new StringValue(i + "");
-                       Record rec = new Record(k);
-                       delegate.setInstance(rec);
-                               
-                       int[] chans = oe2.selectChannels(delegate, hit.length);
-                       for (int chan : chans) {
-                               hit[chan]++;
-                       }
-               }
+               verifyPartitionHashSelectedChannels(10000, 100, new 
Either.Right<>(""));
 
-               cnt = 0;
-               for (int aHit : hit) {
-                       assertTrue(aHit > 0);
-                       cnt += aHit;
-               }
-               assertTrue(cnt == numRecs);
-               
-               // test hash corner cases
+               // Test hash corner cases
                final TestIntComparator testIntComp = new TestIntComparator();
-               final ChannelSelector<SerializationDelegate<Integer>> oe3 = new 
OutputEmitter<Integer>(ShipStrategyType.PARTITION_HASH, testIntComp);
-               final SerializationDelegate<Integer> intDel = new 
SerializationDelegate<Integer>(new IntSerializer());
-               
-               numChans = 100;
-               
+               final ChannelSelector<SerializationDelegate<Integer>> selector 
= new OutputEmitter<>(
+                       ShipStrategyType.PARTITION_HASH, testIntComp);
+               final SerializationDelegate<Integer> serializationDelegate = 
new SerializationDelegate<>(new IntSerializer());
+
                // MinVal hash
-               intDel.setInstance(Integer.MIN_VALUE);
-               int[] chans = oe3.selectChannels(intDel, numChans);
-               assertTrue(chans.length == 1);
-               assertTrue(chans[0] >= 0 && chans[0] <= numChans-1);
-               
+               assertPartitionHashSelectedChannels(selector, 
serializationDelegate, Integer.MIN_VALUE, 100);
                // -1 hash
-               intDel.setInstance(-1);
-               chans = oe3.selectChannels(intDel, hit.length);
-               assertTrue(chans.length == 1);
-               assertTrue(chans[0] >= 0 && chans[0] <= numChans-1);
-               
+               assertPartitionHashSelectedChannels(selector, 
serializationDelegate, -1, 100);
                // 0 hash
-               intDel.setInstance(0);
-               chans = oe3.selectChannels(intDel, hit.length);
-               assertTrue(chans.length == 1);
-               assertTrue(chans[0] >= 0 && chans[0] <= numChans-1);
-               
+               assertPartitionHashSelectedChannels(selector, 
serializationDelegate, 0, 100);
                // 1 hash
-               intDel.setInstance(1);
-               chans = oe3.selectChannels(intDel, hit.length);
-               assertTrue(chans.length == 1);
-               assertTrue(chans[0] >= 0 && chans[0] <= numChans-1);
-               
+               assertPartitionHashSelectedChannels(selector, 
serializationDelegate, 1, 100);
                // MaxVal hash
-               intDel.setInstance(Integer.MAX_VALUE);
-               chans = oe3.selectChannels(intDel, hit.length);
-               assertTrue(chans.length == 1);
-               assertTrue(chans[0] >= 0 && chans[0] <= numChans-1);
+               assertPartitionHashSelectedChannels(selector, 
serializationDelegate, Integer.MAX_VALUE, 100);
        }
 
        @Test
        public void testForward() {
-               // Test for IntValue
-               @SuppressWarnings({"unchecked", "rawtypes"})
-               final TypeComparator<Record> intComp = new 
RecordComparatorFactory(new int[] {0}, new Class[] 
{IntValue.class}).createComparator();
-               final ChannelSelector<SerializationDelegate<Record>> oe1 = new 
OutputEmitter<Record>(ShipStrategyType.FORWARD, intComp);
-               final SerializationDelegate<Record> delegate = new 
SerializationDelegate<Record>(new RecordSerializerFactory().getSerializer());
+               final int numChannels = 100;
 
-               int numChannels = 100;
+               // Test for IntValue
                int numRecords = 50000 + numChannels / 2;
-
-               int[] hit = new int[numChannels];
-
-               for (int i = 0; i < numRecords; i++) {
-                       IntValue k = new IntValue(i);
-                       Record rec = new Record(k);
-                       delegate.setInstance(rec);
-
-                       int[] chans = oe1.selectChannels(delegate, hit.length);
-                       for (int chan : chans) {
-                               hit[chan]++;
-                       }
-               }
-
-               assertTrue(hit[0] == numRecords);
-               for (int i = 1; i < hit.length; i++) {
-                       assertTrue(hit[i] == 0);
-               }
+               verifyForwardSelectedChannels(numRecords, numChannels, new 
Either.Left<>(0));
 
                // Test for StringValue
-               @SuppressWarnings({"unchecked", "rawtypes"})
-               final TypeComparator<Record> stringComp = new 
RecordComparatorFactory(new int[] {0}, new Class[] 
{StringValue.class}).createComparator();
-               final ChannelSelector<SerializationDelegate<Record>> oe2 = new 
OutputEmitter<Record>(ShipStrategyType.FORWARD, stringComp);
-
-               numChannels = 100;
                numRecords = 10000 + numChannels / 2;
-
-               hit = new int[numChannels];
-
-               for (int i = 0; i < numRecords; i++) {
-                       StringValue k = new StringValue(i + "");
-                       Record rec = new Record(k);
-                       delegate.setInstance(rec);
-
-                       int[] chans = oe2.selectChannels(delegate, hit.length);
-                       for (int chan : chans) {
-                               hit[chan]++;
-                       }
-               }
-
-               assertTrue(hit[0] == numRecords);
-               for (int i = 1; i < hit.length; i++) {
-                       assertTrue(hit[i] == 0);
-               }
+               verifyForwardSelectedChannels(numRecords, numChannels, new 
Either.Right<>(""));
        }
 
        @Test
        public void testForcedRebalance() {
-               // Test for IntValue
-               int numChannels = 100;
-               int toTaskIndex = numChannels * 6/7;
+               final int numChannels = 100;
+               int toTaskIndex = numChannels * 6 / 7;
                int fromTaskIndex = toTaskIndex + numChannels;
                int extraRecords = numChannels / 3;
                int numRecords = 50000 + extraRecords;
+               final SerializationDelegate<Record> delegate = new 
SerializationDelegate<>(
+                       new RecordSerializerFactory().getSerializer());
+               final ChannelSelector<SerializationDelegate<Record>> selector = 
new OutputEmitter<>(
+                       ShipStrategyType.PARTITION_FORCED_REBALANCE, 
fromTaskIndex);
 
-               final ChannelSelector<SerializationDelegate<Record>> oe1 = new 
OutputEmitter<Record>(ShipStrategyType.PARTITION_FORCED_REBALANCE, 
fromTaskIndex);
-               final SerializationDelegate<Record> delegate = new 
SerializationDelegate<Record>(new RecordSerializerFactory().getSerializer());
-
-               int[] hit = new int[numChannels];
-
-               for (int i = 0; i < numRecords; i++) {
-                       IntValue k = new IntValue(i);
-                       Record rec = new Record(k);
-                       delegate.setInstance(rec);
-
-                       int[] chans = oe1.selectChannels(delegate, hit.length);
-                       for (int chan : chans) {
-                               hit[chan]++;
-                       }
-               }
-
+               // Test for IntValue
+               int[] hits = getSelectedChannels(selector, delegate, new 
Either.Left<>(0), numRecords, numChannels);
                int cnt = 0;
-               for (int i = 0; i < hit.length; i++) {
-                       if (toTaskIndex <= i || i < 
toTaskIndex+extraRecords-numChannels) {
-                               assertTrue(hit[i] == 
(numRecords/numChannels)+1);
+               for (int i = 0; i < hits.length; i++) {
+                       if (toTaskIndex <= i || i < toTaskIndex+extraRecords - 
numChannels) {
+                               assertTrue(hits[i] == (numRecords / 
numChannels) + 1);
                        } else {
-                               assertTrue(hit[i] == numRecords/numChannels);
+                               assertTrue(hits[i] == numRecords/numChannels);
                        }
-                       cnt += hit[i];
+                       cnt += hits[i];
                }
                assertTrue(cnt == numRecords);
 
-               // Test for StringValue
-               numChannels = 100;
                toTaskIndex = numChannels / 5;
                fromTaskIndex = toTaskIndex + 2 * numChannels;
-               extraRecords = numChannels * 2/9;
+               extraRecords = numChannels * 2 / 9;
                numRecords = 10000 + extraRecords;
 
-               final ChannelSelector<SerializationDelegate<Record>> oe2 = new 
OutputEmitter<Record>(ShipStrategyType.PARTITION_FORCED_REBALANCE, 
fromTaskIndex);
-
-               hit = new int[numChannels];
-
-               for (int i = 0; i < numRecords; i++) {
-                       StringValue k = new StringValue(i + "");
-                       Record rec = new Record(k);
-                       delegate.setInstance(rec);
-
-                       int[] chans = oe2.selectChannels(delegate, hit.length);
-                       for (int chan : chans) {
-                               hit[chan]++;
-                       }
-               }
-
+               // Test for StringValue
+               final ChannelSelector<SerializationDelegate<Record>> selector2 
= new OutputEmitter<>(
+                       ShipStrategyType.PARTITION_FORCED_REBALANCE, 
fromTaskIndex);
+               hits = getSelectedChannels(selector2, delegate, new 
Either.Right<>(""), numRecords, numChannels);
                cnt = 0;
-               for (int i = 0; i < hit.length; i++) {
-                       if (toTaskIndex <= i && i < toTaskIndex+extraRecords) {
-                               assertTrue(hit[i] == 
(numRecords/numChannels)+1);
+               for (int i = 0; i < hits.length; i++) {
+                       if (toTaskIndex <= i && i < toTaskIndex + extraRecords) 
{
+                               assertTrue(hits[i] == (numRecords / 
numChannels) + 1);
                        } else {
-                               assertTrue(hit[i] == numRecords/numChannels);
+                               assertTrue(hits[i] == numRecords / numChannels);
                        }
-                       cnt += hit[i];
+                       cnt += hits[i];
                }
                assertTrue(cnt == numRecords);
        }
        
        @Test
        public void testBroadcast() {
                // Test for IntValue
-               @SuppressWarnings({"unchecked", "rawtypes"})
-               final TypeComparator<Record> intComp = new 
RecordComparatorFactory(new int[] {0}, new Class[] 
{IntValue.class}).createComparator();
-               final ChannelSelector<SerializationDelegate<Record>> oe1 = new 
OutputEmitter<Record>(ShipStrategyType.BROADCAST, intComp);
-               final SerializationDelegate<Record> delegate = new 
SerializationDelegate<Record>(new RecordSerializerFactory().getSerializer());
-               
-               int numChannels = 100;
-               int numRecords = 50000;
-               
-               int[] hit = new int[numChannels];
-
-               for (int i = 0; i < numRecords; i++) {
-                       IntValue k = new IntValue(i);
-                       Record rec = new Record(k);
-                       delegate.setInstance(rec);
-                       
-                       int[] chans = oe1.selectChannels(delegate, hit.length);
-                       for (int chan : chans) {
-                               hit[chan]++;
-                       }
-               }
-
-               for (int aHit : hit) {
-                       assertTrue(aHit + "", aHit == numRecords);
-               }
-               
+               verifyBroadcastSelectedChannels(100, 50000, new 
Either.Left<>(0));
                // Test for StringValue
-               @SuppressWarnings({"unchecked", "rawtypes"})
-               final TypeComparator<Record> stringComp = new 
RecordComparatorFactory(new int[] {0}, new Class[] 
{StringValue.class}).createComparator();
-               final ChannelSelector<SerializationDelegate<Record>> oe2 = new 
OutputEmitter<Record>(ShipStrategyType.BROADCAST, stringComp);
-
-               numChannels = 100;
-               numRecords = 5000;
-               
-               hit = new int[numChannels];
-
-               for (int i = 0; i < numRecords; i++) {
-                       StringValue k = new StringValue(i + "");
-                       Record rec = new Record(k);
-                       delegate.setInstance(rec);
-                               
-                       int[] chans = oe2.selectChannels(delegate, hit.length);
-                       for (int chan : chans) {
-                               hit[chan]++;
-                       }
-               }
-
-               for (int aHit : hit) {
-                       assertTrue(aHit + "", aHit == numRecords);
-               }
+               verifyBroadcastSelectedChannels(100, 50000, new 
Either.Right<>(""));
        }
        
        @Test
        public void testMultiKeys() {
-               @SuppressWarnings({"unchecked", "rawtypes"})
-               final TypeComparator<Record> multiComp = new 
RecordComparatorFactory(new int[] {0,1,3}, new Class[] {IntValue.class, 
StringValue.class, DoubleValue.class}).createComparator();
-               final ChannelSelector<SerializationDelegate<Record>> oe1 = new 
OutputEmitter<Record>(ShipStrategyType.PARTITION_HASH, multiComp);
-               final SerializationDelegate<Record> delegate = new 
SerializationDelegate<Record>(new RecordSerializerFactory().getSerializer());
+               final TypeComparator<Record> multiComp = new 
RecordComparatorFactory(
+                       new int[] {0,1, 3}, new Class[] {IntValue.class, 
StringValue.class, DoubleValue.class}).createComparator();
+               final ChannelSelector<SerializationDelegate<Record>> selector = 
new OutputEmitter<>(ShipStrategyType.PARTITION_HASH, multiComp);
+               final SerializationDelegate<Record> delegate = new 
SerializationDelegate<>(new RecordSerializerFactory().getSerializer());
                
                int numChannels = 100;
                int numRecords = 5000;
-               
-               int[] hit = new int[numChannels];
-
+               int[] hits = new int[numChannels];
                for (int i = 0; i < numRecords; i++) {
-                       Record rec = new Record(4);
-                       rec.setField(0, new IntValue(i));
-                       rec.setField(1, new StringValue("AB"+i+"CD"+i));
-                       rec.setField(3, new DoubleValue(i*3.141d));
-                       delegate.setInstance(rec);
-                       
-                       int[] chans = oe1.selectChannels(delegate, hit.length);
-                       for (int chan : chans) {
-                               hit[chan]++;
+                       Record record = new Record(4);
+                       record.setField(0, new IntValue(i));
+                       record.setField(1, new StringValue("AB" + i + "CD" + 
i));
+                       record.setField(3, new DoubleValue(i * 3.141d));
+                       delegate.setInstance(record);
+
+                       int[] channels = selector.selectChannels(delegate, 
hits.length);
+                       for (int channel : channels) {
+                               hits[channel]++;
                        }
                }
 
                int cnt = 0;
-               for (int aHit : hit) {
-                       assertTrue(aHit > 0);
-                       cnt += aHit;
+               for (int hit : hits) {
+                       assertTrue(hit > 0);
+                       cnt += hit;
                }
                assertTrue(cnt == numRecords);
-               
        }
        
        @Test
        public void testMissingKey() {
-               // Test for IntValue
-               @SuppressWarnings({"unchecked", "rawtypes"})
-               final TypeComparator<Record> intComp = new 
RecordComparatorFactory(new int[] {1}, new Class[] 
{IntValue.class}).createComparator();
-               final ChannelSelector<SerializationDelegate<Record>> oe1 = new 
OutputEmitter<Record>(ShipStrategyType.PARTITION_HASH, intComp);
-               final SerializationDelegate<Record> delegate = new 
SerializationDelegate<Record>(new RecordSerializerFactory().getSerializer());
-               
-               Record rec = new Record(0);
-               rec.setField(0, new IntValue(1));
-               delegate.setInstance(rec);
-               
-               try {
-                       oe1.selectChannels(delegate, 100);
-               } catch (KeyFieldOutOfBoundsException re) {
-                       Assert.assertEquals(1, re.getFieldNumber());
-                       return;
+               if (!verifyWrongPartitionHashKey(1, 0)) {
+                       Assert.fail("Expected a KeyFieldOutOfBoundsException.");
                }
-               Assert.fail("Expected a KeyFieldOutOfBoundsException.");
        }
        
        @Test
        public void testNullKey() {
-               // Test for IntValue
-               @SuppressWarnings({"unchecked", "rawtypes"})
-               final TypeComparator<Record> intComp = new 
RecordComparatorFactory(new int[] {0}, new Class[] 
{IntValue.class}).createComparator();
-               final ChannelSelector<SerializationDelegate<Record>> oe1 = new 
OutputEmitter<Record>(ShipStrategyType.PARTITION_HASH, intComp);
-               final SerializationDelegate<Record> delegate = new 
SerializationDelegate<Record>(new RecordSerializerFactory().getSerializer());
-               
-               Record rec = new Record(2);
-               rec.setField(1, new IntValue(1));
-               delegate.setInstance(rec);
-
-               try {
-                       oe1.selectChannels(delegate, 100);
-               } catch (NullKeyFieldException re) {
-                       Assert.assertEquals(0, re.getFieldNumber());
-                       return;
+               if (!verifyWrongPartitionHashKey(0, 1)) {
+                       Assert.fail("Expected a NullKeyFieldException.");
                }
-               Assert.fail("Expected a NullKeyFieldException.");
        }
        
        @Test
-       public void testWrongKeyClass() {
-               
+       public void testWrongKeyClass() throws Exception {
                // Test for IntValue
-               @SuppressWarnings({"unchecked", "rawtypes"})
-               final TypeComparator<Record> doubleComp = new 
RecordComparatorFactory(new int[] {0}, new Class[] 
{DoubleValue.class}).createComparator();
-               final ChannelSelector<SerializationDelegate<Record>> oe1 = new 
OutputEmitter<Record>(ShipStrategyType.PARTITION_HASH, doubleComp);
-               final SerializationDelegate<Record> delegate = new 
SerializationDelegate<Record>(new RecordSerializerFactory().getSerializer());
-               
-               
-               ;
-               
-               Record rec = null;
-               
-               try {
-                       PipedInputStream pipedInput = new 
PipedInputStream(1024*1024);
-                       
-                       DataInputView in = new 
DataInputViewStreamWrapper(pipedInput);
-                       DataOutputView out = new 
DataOutputViewStreamWrapper(new PipedOutputStream(pipedInput));
-                       
-                       rec = new Record(1);
-                       rec.setField(0, new IntValue());
-                       
-                       rec.write(out);
-                       rec = new Record();
-                       rec.read(in);
-               } catch (IOException e) {
-                       fail("Test erroneous");
-               }
+               final TypeComparator<Record> doubleComp = new 
RecordComparatorFactory(
+                       new int[] {0}, new Class[] 
{DoubleValue.class}).createComparator();
+               final ChannelSelector<SerializationDelegate<Record>> selector = 
new OutputEmitter<>(
+                       ShipStrategyType.PARTITION_HASH, doubleComp);
+               final SerializationDelegate<Record> delegate = new 
SerializationDelegate<>(new RecordSerializerFactory().getSerializer());
+
+               PipedInputStream pipedInput = new PipedInputStream(1024 * 1024);
+               DataInputView in = new DataInputViewStreamWrapper(pipedInput);
+               DataOutputView out = new DataOutputViewStreamWrapper(new 
PipedOutputStream(pipedInput));
+
+               Record record = new Record(1);
+               record.setField(0, new IntValue());
+               record.write(out);
+               record = new Record();
+               record.read(in);
 
                try {
-                       delegate.setInstance(rec);
-                       oe1.selectChannels(delegate, 100);
+                       delegate.setInstance(record);
+                       selector.selectChannels(delegate, 100);
                } catch (DeserializationException re) {
                        return;
                }
                Assert.fail("Expected a NullKeyFieldException.");
        }
-       
-       @SuppressWarnings({"serial", "rawtypes"})
+
+       private boolean verifyWrongPartitionHashKey(int position, int fieldNum) 
{
+               final TypeComparator<Record> comparator = new 
RecordComparatorFactory(
+                       new int[] {position}, new Class[] 
{IntValue.class}).createComparator();
+               final ChannelSelector<SerializationDelegate<Record>> selector = 
new OutputEmitter<>(
+                       ShipStrategyType.PARTITION_HASH, comparator);
+               final SerializationDelegate<Record> delegate = new 
SerializationDelegate<>(new RecordSerializerFactory().getSerializer());
+
+               Record record = new Record(2);
+               record.setField(fieldNum, new IntValue(1));
+               delegate.setInstance(record);
+
+               try {
+                       selector.selectChannels(delegate, 100);
+               } catch (NullKeyFieldException re) {
+                       Assert.assertEquals(position, re.getFieldNumber());
+                       return true;
+               }
+               return false;
+       }
+
+       private int[] getSelectedChannels(
+               ShipStrategyType shipStrategyType,
+               int numRecords,
+               int numChannels,
+               Either<Integer, String> recordType) {
 
 Review comment:
   formatting nit: could you add one more tab before the parameters? As it is 
now, parameters are fusing with the method code.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Deduplicate common codes in OutputEmitterTest
> ---------------------------------------------
>
>                 Key: FLINK-10942
>                 URL: https://issues.apache.org/jira/browse/FLINK-10942
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Network, Tests
>            Reporter: zhijiang
>            Assignee: zhijiang
>            Priority: Minor
>              Labels: pull-request-available
>             Fix For: 1.7.1
>
>
> There are many duplicated codes in {{OutputEmitterTest}} to make it difficult 
> to maintain. So it is necessary to abstract the common codes to make it 
> simple which brings benefits for the following refactor work in 
> {{ChannelSelector}} interface.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to