pnowojski closed pull request #7146: [FLINK-10942][network,test] Deduplicate 
common codes in OutputEmitterTest
URL: https://github.com/apache/flink/pull/7146
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/OutputEmitterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/OutputEmitterTest.java
index 204e3f02c7c..5c7ed3ad148 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/OutputEmitterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/OutputEmitterTest.java
@@ -35,11 +35,11 @@
 import org.apache.flink.types.DeserializationException;
 import org.apache.flink.types.DoubleValue;
 import org.apache.flink.types.IntValue;
-import org.apache.flink.types.KeyFieldOutOfBoundsException;
 import org.apache.flink.types.NullKeyFieldException;
 import org.apache.flink.types.Record;
 import org.apache.flink.types.StringValue;
 
+import org.apache.flink.types.Value;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -48,404 +48,269 @@
 import java.io.PipedOutputStream;
 
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
+@SuppressWarnings({"unchecked", "rawtypes"})
 public class OutputEmitterTest {
-       
-       
+
        @Test
        public void testPartitionHash() {
                // Test for IntValue
-               @SuppressWarnings({"unchecked", "rawtypes"})
-               final TypeComparator<Record> intComp = new 
RecordComparatorFactory(new int[] {0}, new Class[] 
{IntValue.class}).createComparator();
-               final ChannelSelector<SerializationDelegate<Record>> oe1 = new 
OutputEmitter<Record>(ShipStrategyType.PARTITION_HASH, intComp);
-               final SerializationDelegate<Record> delegate = new 
SerializationDelegate<Record>(new RecordSerializerFactory().getSerializer());
-               
-               int numChans = 100;
-               int numRecs = 50000;
-               int[] hit = new int[numChans];
-
-               for (int i = 0; i < numRecs; i++) {
-                       IntValue k = new IntValue(i);
-                       Record rec = new Record(k);
-                       
-                       delegate.setInstance(rec);
-                       
-                       int[] chans = oe1.selectChannels(delegate, hit.length);
-                       for (int chan : chans) {
-                               hit[chan]++;
-                       }
-               }
-
-               int cnt = 0;
-               for (int aHit : hit) {
-                       assertTrue(aHit > 0);
-                       cnt += aHit;
-               }
-               assertTrue(cnt == numRecs);
-
+               verifyPartitionHashSelectedChannels(50000, 100, 
RecordType.INTEGER);
                // Test for StringValue
-               @SuppressWarnings({"unchecked", "rawtypes"})
-               final TypeComparator<Record> stringComp = new 
RecordComparatorFactory(new int[] {0}, new Class[] 
{StringValue.class}).createComparator();
-               final ChannelSelector<SerializationDelegate<Record>> oe2 = new 
OutputEmitter<Record>(ShipStrategyType.PARTITION_HASH, stringComp);
+               verifyPartitionHashSelectedChannels(10000, 100, 
RecordType.STRING);
 
-               numChans = 100;
-               numRecs = 10000;
-               
-               hit = new int[numChans];
-
-               for (int i = 0; i < numRecs; i++) {
-                       StringValue k = new StringValue(i + "");
-                       Record rec = new Record(k);
-                       delegate.setInstance(rec);
-                               
-                       int[] chans = oe2.selectChannels(delegate, hit.length);
-                       for (int chan : chans) {
-                               hit[chan]++;
-                       }
-               }
-
-               cnt = 0;
-               for (int aHit : hit) {
-                       assertTrue(aHit > 0);
-                       cnt += aHit;
-               }
-               assertTrue(cnt == numRecs);
-               
-               // test hash corner cases
+               // Test hash corner cases
                final TestIntComparator testIntComp = new TestIntComparator();
-               final ChannelSelector<SerializationDelegate<Integer>> oe3 = new 
OutputEmitter<Integer>(ShipStrategyType.PARTITION_HASH, testIntComp);
-               final SerializationDelegate<Integer> intDel = new 
SerializationDelegate<Integer>(new IntSerializer());
-               
-               numChans = 100;
-               
-               // MinVal hash
-               intDel.setInstance(Integer.MIN_VALUE);
-               int[] chans = oe3.selectChannels(intDel, numChans);
-               assertTrue(chans.length == 1);
-               assertTrue(chans[0] >= 0 && chans[0] <= numChans-1);
-               
-               // -1 hash
-               intDel.setInstance(-1);
-               chans = oe3.selectChannels(intDel, hit.length);
-               assertTrue(chans.length == 1);
-               assertTrue(chans[0] >= 0 && chans[0] <= numChans-1);
-               
-               // 0 hash
-               intDel.setInstance(0);
-               chans = oe3.selectChannels(intDel, hit.length);
-               assertTrue(chans.length == 1);
-               assertTrue(chans[0] >= 0 && chans[0] <= numChans-1);
-               
-               // 1 hash
-               intDel.setInstance(1);
-               chans = oe3.selectChannels(intDel, hit.length);
-               assertTrue(chans.length == 1);
-               assertTrue(chans[0] >= 0 && chans[0] <= numChans-1);
-               
-               // MaxVal hash
-               intDel.setInstance(Integer.MAX_VALUE);
-               chans = oe3.selectChannels(intDel, hit.length);
-               assertTrue(chans.length == 1);
-               assertTrue(chans[0] >= 0 && chans[0] <= numChans-1);
+               final ChannelSelector<SerializationDelegate<Integer>> selector 
= new OutputEmitter<>(
+                       ShipStrategyType.PARTITION_HASH, testIntComp);
+               final SerializationDelegate<Integer> serializationDelegate = 
new SerializationDelegate<>(new IntSerializer());
+
+               assertPartitionHashSelectedChannels(selector, 
serializationDelegate, Integer.MIN_VALUE, 100);
+               assertPartitionHashSelectedChannels(selector, 
serializationDelegate, -1, 100);
+               assertPartitionHashSelectedChannels(selector, 
serializationDelegate, 0, 100);
+               assertPartitionHashSelectedChannels(selector, 
serializationDelegate, 1, 100);
+               assertPartitionHashSelectedChannels(selector, 
serializationDelegate, Integer.MAX_VALUE, 100);
        }
 
        @Test
        public void testForward() {
-               // Test for IntValue
-               @SuppressWarnings({"unchecked", "rawtypes"})
-               final TypeComparator<Record> intComp = new 
RecordComparatorFactory(new int[] {0}, new Class[] 
{IntValue.class}).createComparator();
-               final ChannelSelector<SerializationDelegate<Record>> oe1 = new 
OutputEmitter<Record>(ShipStrategyType.FORWARD, intComp);
-               final SerializationDelegate<Record> delegate = new 
SerializationDelegate<Record>(new RecordSerializerFactory().getSerializer());
+               final int numChannels = 100;
 
-               int numChannels = 100;
+               // Test for IntValue
                int numRecords = 50000 + numChannels / 2;
-
-               int[] hit = new int[numChannels];
-
-               for (int i = 0; i < numRecords; i++) {
-                       IntValue k = new IntValue(i);
-                       Record rec = new Record(k);
-                       delegate.setInstance(rec);
-
-                       int[] chans = oe1.selectChannels(delegate, hit.length);
-                       for (int chan : chans) {
-                               hit[chan]++;
-                       }
-               }
-
-               assertTrue(hit[0] == numRecords);
-               for (int i = 1; i < hit.length; i++) {
-                       assertTrue(hit[i] == 0);
-               }
+               verifyForwardSelectedChannels(numRecords, numChannels, 
RecordType.INTEGER);
 
                // Test for StringValue
-               @SuppressWarnings({"unchecked", "rawtypes"})
-               final TypeComparator<Record> stringComp = new 
RecordComparatorFactory(new int[] {0}, new Class[] 
{StringValue.class}).createComparator();
-               final ChannelSelector<SerializationDelegate<Record>> oe2 = new 
OutputEmitter<Record>(ShipStrategyType.FORWARD, stringComp);
-
-               numChannels = 100;
                numRecords = 10000 + numChannels / 2;
-
-               hit = new int[numChannels];
-
-               for (int i = 0; i < numRecords; i++) {
-                       StringValue k = new StringValue(i + "");
-                       Record rec = new Record(k);
-                       delegate.setInstance(rec);
-
-                       int[] chans = oe2.selectChannels(delegate, hit.length);
-                       for (int chan : chans) {
-                               hit[chan]++;
-                       }
-               }
-
-               assertTrue(hit[0] == numRecords);
-               for (int i = 1; i < hit.length; i++) {
-                       assertTrue(hit[i] == 0);
-               }
+               verifyForwardSelectedChannels(numRecords, numChannels, 
RecordType.STRING);
        }
 
        @Test
        public void testForcedRebalance() {
-               // Test for IntValue
-               int numChannels = 100;
-               int toTaskIndex = numChannels * 6/7;
+               final int numChannels = 100;
+               int toTaskIndex = numChannels * 6 / 7;
                int fromTaskIndex = toTaskIndex + numChannels;
                int extraRecords = numChannels / 3;
                int numRecords = 50000 + extraRecords;
+               final SerializationDelegate<Record> delegate = new 
SerializationDelegate<>(
+                       new RecordSerializerFactory().getSerializer());
+               final ChannelSelector<SerializationDelegate<Record>> selector = 
new OutputEmitter<>(
+                       ShipStrategyType.PARTITION_FORCED_REBALANCE, 
fromTaskIndex);
 
-               final ChannelSelector<SerializationDelegate<Record>> oe1 = new 
OutputEmitter<Record>(ShipStrategyType.PARTITION_FORCED_REBALANCE, 
fromTaskIndex);
-               final SerializationDelegate<Record> delegate = new 
SerializationDelegate<Record>(new RecordSerializerFactory().getSerializer());
-
-               int[] hit = new int[numChannels];
-
-               for (int i = 0; i < numRecords; i++) {
-                       IntValue k = new IntValue(i);
-                       Record rec = new Record(k);
-                       delegate.setInstance(rec);
-
-                       int[] chans = oe1.selectChannels(delegate, hit.length);
-                       for (int chan : chans) {
-                               hit[chan]++;
-                       }
-               }
-
-               int cnt = 0;
-               for (int i = 0; i < hit.length; i++) {
-                       if (toTaskIndex <= i || i < 
toTaskIndex+extraRecords-numChannels) {
-                               assertTrue(hit[i] == 
(numRecords/numChannels)+1);
+               // Test for IntValue
+               int[] hits = getSelectedChannelsHitCount(selector, delegate, 
RecordType.INTEGER, numRecords, numChannels);
+               int totalHitCount = 0;
+               for (int i = 0; i < hits.length; i++) {
+                       if (toTaskIndex <= i || i < toTaskIndex+extraRecords - 
numChannels) {
+                               assertTrue(hits[i] == (numRecords / 
numChannels) + 1);
                        } else {
-                               assertTrue(hit[i] == numRecords/numChannels);
+                               assertTrue(hits[i] == numRecords/numChannels);
                        }
-                       cnt += hit[i];
+                       totalHitCount += hits[i];
                }
-               assertTrue(cnt == numRecords);
+               assertTrue(totalHitCount == numRecords);
 
-               // Test for StringValue
-               numChannels = 100;
                toTaskIndex = numChannels / 5;
                fromTaskIndex = toTaskIndex + 2 * numChannels;
-               extraRecords = numChannels * 2/9;
+               extraRecords = numChannels * 2 / 9;
                numRecords = 10000 + extraRecords;
 
-               final ChannelSelector<SerializationDelegate<Record>> oe2 = new 
OutputEmitter<Record>(ShipStrategyType.PARTITION_FORCED_REBALANCE, 
fromTaskIndex);
-
-               hit = new int[numChannels];
-
-               for (int i = 0; i < numRecords; i++) {
-                       StringValue k = new StringValue(i + "");
-                       Record rec = new Record(k);
-                       delegate.setInstance(rec);
-
-                       int[] chans = oe2.selectChannels(delegate, hit.length);
-                       for (int chan : chans) {
-                               hit[chan]++;
-                       }
-               }
-
-               cnt = 0;
-               for (int i = 0; i < hit.length; i++) {
-                       if (toTaskIndex <= i && i < toTaskIndex+extraRecords) {
-                               assertTrue(hit[i] == 
(numRecords/numChannels)+1);
+               // Test for StringValue
+               final ChannelSelector<SerializationDelegate<Record>> selector2 
= new OutputEmitter<>(
+                       ShipStrategyType.PARTITION_FORCED_REBALANCE, 
fromTaskIndex);
+               hits = getSelectedChannelsHitCount(selector2, delegate, 
RecordType.STRING, numRecords, numChannels);
+               totalHitCount = 0;
+               for (int i = 0; i < hits.length; i++) {
+                       if (toTaskIndex <= i && i < toTaskIndex + extraRecords) 
{
+                               assertTrue(hits[i] == (numRecords / 
numChannels) + 1);
                        } else {
-                               assertTrue(hit[i] == numRecords/numChannels);
+                               assertTrue(hits[i] == numRecords / numChannels);
                        }
-                       cnt += hit[i];
+                       totalHitCount += hits[i];
                }
-               assertTrue(cnt == numRecords);
+               assertTrue(totalHitCount == numRecords);
        }
        
        @Test
        public void testBroadcast() {
                // Test for IntValue
-               @SuppressWarnings({"unchecked", "rawtypes"})
-               final TypeComparator<Record> intComp = new 
RecordComparatorFactory(new int[] {0}, new Class[] 
{IntValue.class}).createComparator();
-               final ChannelSelector<SerializationDelegate<Record>> oe1 = new 
OutputEmitter<Record>(ShipStrategyType.BROADCAST, intComp);
-               final SerializationDelegate<Record> delegate = new 
SerializationDelegate<Record>(new RecordSerializerFactory().getSerializer());
-               
-               int numChannels = 100;
-               int numRecords = 50000;
-               
-               int[] hit = new int[numChannels];
-
-               for (int i = 0; i < numRecords; i++) {
-                       IntValue k = new IntValue(i);
-                       Record rec = new Record(k);
-                       delegate.setInstance(rec);
-                       
-                       int[] chans = oe1.selectChannels(delegate, hit.length);
-                       for (int chan : chans) {
-                               hit[chan]++;
-                       }
-               }
-
-               for (int aHit : hit) {
-                       assertTrue(aHit + "", aHit == numRecords);
-               }
-               
+               verifyBroadcastSelectedChannels(100, 50000, RecordType.INTEGER);
                // Test for StringValue
-               @SuppressWarnings({"unchecked", "rawtypes"})
-               final TypeComparator<Record> stringComp = new 
RecordComparatorFactory(new int[] {0}, new Class[] 
{StringValue.class}).createComparator();
-               final ChannelSelector<SerializationDelegate<Record>> oe2 = new 
OutputEmitter<Record>(ShipStrategyType.BROADCAST, stringComp);
-
-               numChannels = 100;
-               numRecords = 5000;
-               
-               hit = new int[numChannels];
-
-               for (int i = 0; i < numRecords; i++) {
-                       StringValue k = new StringValue(i + "");
-                       Record rec = new Record(k);
-                       delegate.setInstance(rec);
-                               
-                       int[] chans = oe2.selectChannels(delegate, hit.length);
-                       for (int chan : chans) {
-                               hit[chan]++;
-                       }
-               }
-
-               for (int aHit : hit) {
-                       assertTrue(aHit + "", aHit == numRecords);
-               }
+               verifyBroadcastSelectedChannels(100, 50000, RecordType.STRING);
        }
        
        @Test
        public void testMultiKeys() {
-               @SuppressWarnings({"unchecked", "rawtypes"})
-               final TypeComparator<Record> multiComp = new 
RecordComparatorFactory(new int[] {0,1,3}, new Class[] {IntValue.class, 
StringValue.class, DoubleValue.class}).createComparator();
-               final ChannelSelector<SerializationDelegate<Record>> oe1 = new 
OutputEmitter<Record>(ShipStrategyType.PARTITION_HASH, multiComp);
-               final SerializationDelegate<Record> delegate = new 
SerializationDelegate<Record>(new RecordSerializerFactory().getSerializer());
+               final TypeComparator<Record> multiComp = new 
RecordComparatorFactory(
+                       new int[] {0,1, 3}, new Class[] {IntValue.class, 
StringValue.class, DoubleValue.class}).createComparator();
+               final ChannelSelector<SerializationDelegate<Record>> selector = 
new OutputEmitter<>(
+                       ShipStrategyType.PARTITION_HASH, multiComp);
+               final SerializationDelegate<Record> delegate = new 
SerializationDelegate<>(new RecordSerializerFactory().getSerializer());
                
                int numChannels = 100;
                int numRecords = 5000;
-               
-               int[] hit = new int[numChannels];
-
+               int[] hits = new int[numChannels];
                for (int i = 0; i < numRecords; i++) {
-                       Record rec = new Record(4);
-                       rec.setField(0, new IntValue(i));
-                       rec.setField(1, new StringValue("AB"+i+"CD"+i));
-                       rec.setField(3, new DoubleValue(i*3.141d));
-                       delegate.setInstance(rec);
-                       
-                       int[] chans = oe1.selectChannels(delegate, hit.length);
-                       for (int chan : chans) {
-                               hit[chan]++;
+                       Record record = new Record(4);
+                       record.setField(0, new IntValue(i));
+                       record.setField(1, new StringValue("AB" + i + "CD" + 
i));
+                       record.setField(3, new DoubleValue(i * 3.141d));
+                       delegate.setInstance(record);
+
+                       int[] channels = selector.selectChannels(delegate, 
hits.length);
+                       for (int channel : channels) {
+                               hits[channel]++;
                        }
                }
 
-               int cnt = 0;
-               for (int aHit : hit) {
-                       assertTrue(aHit > 0);
-                       cnt += aHit;
+               int totalHitCount = 0;
+               for (int hit : hits) {
+                       assertTrue(hit > 0);
+                       totalHitCount += hit;
                }
-               assertTrue(cnt == numRecords);
-               
+               assertTrue(totalHitCount == numRecords);
        }
        
        @Test
        public void testMissingKey() {
-               // Test for IntValue
-               @SuppressWarnings({"unchecked", "rawtypes"})
-               final TypeComparator<Record> intComp = new 
RecordComparatorFactory(new int[] {1}, new Class[] 
{IntValue.class}).createComparator();
-               final ChannelSelector<SerializationDelegate<Record>> oe1 = new 
OutputEmitter<Record>(ShipStrategyType.PARTITION_HASH, intComp);
-               final SerializationDelegate<Record> delegate = new 
SerializationDelegate<Record>(new RecordSerializerFactory().getSerializer());
-               
-               Record rec = new Record(0);
-               rec.setField(0, new IntValue(1));
-               delegate.setInstance(rec);
-               
-               try {
-                       oe1.selectChannels(delegate, 100);
-               } catch (KeyFieldOutOfBoundsException re) {
-                       Assert.assertEquals(1, re.getFieldNumber());
-                       return;
+               if (!verifyWrongPartitionHashKey(1, 0)) {
+                       Assert.fail("Expected a KeyFieldOutOfBoundsException.");
                }
-               Assert.fail("Expected a KeyFieldOutOfBoundsException.");
        }
        
        @Test
        public void testNullKey() {
-               // Test for IntValue
-               @SuppressWarnings({"unchecked", "rawtypes"})
-               final TypeComparator<Record> intComp = new 
RecordComparatorFactory(new int[] {0}, new Class[] 
{IntValue.class}).createComparator();
-               final ChannelSelector<SerializationDelegate<Record>> oe1 = new 
OutputEmitter<Record>(ShipStrategyType.PARTITION_HASH, intComp);
-               final SerializationDelegate<Record> delegate = new 
SerializationDelegate<Record>(new RecordSerializerFactory().getSerializer());
-               
-               Record rec = new Record(2);
-               rec.setField(1, new IntValue(1));
-               delegate.setInstance(rec);
-
-               try {
-                       oe1.selectChannels(delegate, 100);
-               } catch (NullKeyFieldException re) {
-                       Assert.assertEquals(0, re.getFieldNumber());
-                       return;
+               if (!verifyWrongPartitionHashKey(0, 1)) {
+                       Assert.fail("Expected a NullKeyFieldException.");
                }
-               Assert.fail("Expected a NullKeyFieldException.");
        }
        
        @Test
-       public void testWrongKeyClass() {
-               
+       public void testWrongKeyClass() throws Exception {
                // Test for IntValue
-               @SuppressWarnings({"unchecked", "rawtypes"})
-               final TypeComparator<Record> doubleComp = new 
RecordComparatorFactory(new int[] {0}, new Class[] 
{DoubleValue.class}).createComparator();
-               final ChannelSelector<SerializationDelegate<Record>> oe1 = new 
OutputEmitter<Record>(ShipStrategyType.PARTITION_HASH, doubleComp);
-               final SerializationDelegate<Record> delegate = new 
SerializationDelegate<Record>(new RecordSerializerFactory().getSerializer());
-               
-               
-               ;
-               
-               Record rec = null;
-               
-               try {
-                       PipedInputStream pipedInput = new 
PipedInputStream(1024*1024);
-                       
-                       DataInputView in = new 
DataInputViewStreamWrapper(pipedInput);
-                       DataOutputView out = new 
DataOutputViewStreamWrapper(new PipedOutputStream(pipedInput));
-                       
-                       rec = new Record(1);
-                       rec.setField(0, new IntValue());
-                       
-                       rec.write(out);
-                       rec = new Record();
-                       rec.read(in);
-               } catch (IOException e) {
-                       fail("Test erroneous");
-               }
+               final TypeComparator<Record> doubleComp = new 
RecordComparatorFactory(
+                       new int[] {0}, new Class[] 
{DoubleValue.class}).createComparator();
+               final ChannelSelector<SerializationDelegate<Record>> selector = 
new OutputEmitter<>(
+                       ShipStrategyType.PARTITION_HASH, doubleComp);
+               final SerializationDelegate<Record> delegate = new 
SerializationDelegate<>(new RecordSerializerFactory().getSerializer());
+
+               PipedInputStream pipedInput = new PipedInputStream(1024 * 1024);
+               DataInputView in = new DataInputViewStreamWrapper(pipedInput);
+               DataOutputView out = new DataOutputViewStreamWrapper(new 
PipedOutputStream(pipedInput));
+
+               Record record = new Record(1);
+               record.setField(0, new IntValue());
+               record.write(out);
+               record = new Record();
+               record.read(in);
 
                try {
-                       delegate.setInstance(rec);
-                       oe1.selectChannels(delegate, 100);
+                       delegate.setInstance(record);
+                       selector.selectChannels(delegate, 100);
                } catch (DeserializationException re) {
                        return;
                }
                Assert.fail("Expected a NullKeyFieldException.");
        }
-       
-       @SuppressWarnings({"serial", "rawtypes"})
+
+       private void verifyPartitionHashSelectedChannels(int numRecords, int 
numChannels, Enum recordType) {
+               int[] hits = 
getSelectedChannelsHitCount(ShipStrategyType.PARTITION_HASH, numRecords, 
numChannels, recordType);
+
+               int totalHitCount = 0;
+               for (int hit : hits) {
+                       assertTrue(hit > 0);
+                       totalHitCount += hit;
+               }
+               assertTrue(totalHitCount == numRecords);
+       }
+
+       private void assertPartitionHashSelectedChannels(
+                       ChannelSelector selector,
+                       SerializationDelegate<Integer> serializationDelegate,
+                       int record,
+                       int numChannels) {
+               serializationDelegate.setInstance(record);
+               int[] selectedChannels = 
selector.selectChannels(serializationDelegate, numChannels);
+
+               assertTrue(selectedChannels.length == 1);
+               assertTrue(selectedChannels[0] >= 0 && selectedChannels[0] <= 
numChannels - 1);
+       }
+
+       private void verifyForwardSelectedChannels(int numRecords, int 
numChannels, Enum recordType) {
+               int[] hits = 
getSelectedChannelsHitCount(ShipStrategyType.FORWARD, numRecords, numChannels, 
recordType);
+
+               assertTrue(hits[0] == numRecords);
+               for (int i = 1; i < hits.length; i++) {
+                       assertTrue(hits[i] == 0);
+               }
+       }
+
+       private void verifyBroadcastSelectedChannels(int numRecords, int 
numChannels, Enum recordType) {
+               int[] hits = 
getSelectedChannelsHitCount(ShipStrategyType.BROADCAST, numRecords, 
numChannels, recordType);
+
+               for (int hit : hits) {
+                       assertTrue(hit + "", hit == numRecords);
+               }
+       }
+
+       private boolean verifyWrongPartitionHashKey(int position, int fieldNum) 
{
+               final TypeComparator<Record> comparator = new 
RecordComparatorFactory(
+                       new int[] {position}, new Class[] 
{IntValue.class}).createComparator();
+               final ChannelSelector<SerializationDelegate<Record>> selector = 
new OutputEmitter<>(
+                       ShipStrategyType.PARTITION_HASH, comparator);
+               final SerializationDelegate<Record> delegate = new 
SerializationDelegate<>(new RecordSerializerFactory().getSerializer());
+
+               Record record = new Record(2);
+               record.setField(fieldNum, new IntValue(1));
+               delegate.setInstance(record);
+
+               try {
+                       selector.selectChannels(delegate, 100);
+               } catch (NullKeyFieldException re) {
+                       Assert.assertEquals(position, re.getFieldNumber());
+                       return true;
+               }
+               return false;
+       }
+
+       private int[] getSelectedChannelsHitCount(
+                       ShipStrategyType shipStrategyType,
+                       int numRecords,
+                       int numChannels,
+                       Enum recordType) {
+               final TypeComparator<Record> comparator = new 
RecordComparatorFactory(
+                       new int[] {0}, new Class[] {recordType == 
RecordType.INTEGER ? IntValue.class : StringValue.class}).createComparator();
+               final ChannelSelector<SerializationDelegate<Record>> selector = 
new OutputEmitter<>(shipStrategyType, comparator);
+               final SerializationDelegate<Record> delegate = new 
SerializationDelegate<>(new RecordSerializerFactory().getSerializer());
+
+               return getSelectedChannelsHitCount(selector, delegate, 
recordType, numRecords, numChannels);
+       }
+
+       private int[] getSelectedChannelsHitCount(
+                       ChannelSelector<SerializationDelegate<Record>> selector,
+                       SerializationDelegate<Record> delegate,
+                       Enum recordType,
+                       int numRecords,
+                       int numChannels) {
+               int[] hits = new int[numChannels];
+               Value value;
+               for (int i = 0; i < numRecords; i++) {
+                       if (recordType == RecordType.INTEGER) {
+                               value = new IntValue(i);
+                       } else {
+                               value = new StringValue(i + "");
+                       }
+                       Record record = new Record(value);
+                       delegate.setInstance(record);
+
+                       int[] channels = selector.selectChannels(delegate, 
hits.length);
+                       for (int channel : channels) {
+                               hits[channel]++;
+                       }
+               }
+               return hits;
+       }
+
        private static class TestIntComparator extends TypeComparator<Integer> {
                private TypeComparator[] comparators = new TypeComparator[]{new 
IntComparator(true)};
 
@@ -517,4 +382,9 @@ public int extractKeys(Object record, Object[] target, int 
index) {
                        return comparators;
                }
        }
+
+       private enum RecordType {
+               STRING,
+               INTEGER
+       }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to