pnowojski closed pull request #7146: [FLINK-10942][network,test] Deduplicate
common codes in OutputEmitterTest
URL: https://github.com/apache/flink/pull/7146
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/OutputEmitterTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/OutputEmitterTest.java
index 204e3f02c7c..5c7ed3ad148 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/OutputEmitterTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/OutputEmitterTest.java
@@ -35,11 +35,11 @@
import org.apache.flink.types.DeserializationException;
import org.apache.flink.types.DoubleValue;
import org.apache.flink.types.IntValue;
-import org.apache.flink.types.KeyFieldOutOfBoundsException;
import org.apache.flink.types.NullKeyFieldException;
import org.apache.flink.types.Record;
import org.apache.flink.types.StringValue;
+import org.apache.flink.types.Value;
import org.junit.Assert;
import org.junit.Test;
@@ -48,404 +48,269 @@
import java.io.PipedOutputStream;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+@SuppressWarnings({"unchecked", "rawtypes"})
public class OutputEmitterTest {
-
-
+
@Test
public void testPartitionHash() {
// Test for IntValue
- @SuppressWarnings({"unchecked", "rawtypes"})
- final TypeComparator<Record> intComp = new
RecordComparatorFactory(new int[] {0}, new Class[]
{IntValue.class}).createComparator();
- final ChannelSelector<SerializationDelegate<Record>> oe1 = new
OutputEmitter<Record>(ShipStrategyType.PARTITION_HASH, intComp);
- final SerializationDelegate<Record> delegate = new
SerializationDelegate<Record>(new RecordSerializerFactory().getSerializer());
-
- int numChans = 100;
- int numRecs = 50000;
- int[] hit = new int[numChans];
-
- for (int i = 0; i < numRecs; i++) {
- IntValue k = new IntValue(i);
- Record rec = new Record(k);
-
- delegate.setInstance(rec);
-
- int[] chans = oe1.selectChannels(delegate, hit.length);
- for (int chan : chans) {
- hit[chan]++;
- }
- }
-
- int cnt = 0;
- for (int aHit : hit) {
- assertTrue(aHit > 0);
- cnt += aHit;
- }
- assertTrue(cnt == numRecs);
-
+ verifyPartitionHashSelectedChannels(50000, 100,
RecordType.INTEGER);
// Test for StringValue
- @SuppressWarnings({"unchecked", "rawtypes"})
- final TypeComparator<Record> stringComp = new
RecordComparatorFactory(new int[] {0}, new Class[]
{StringValue.class}).createComparator();
- final ChannelSelector<SerializationDelegate<Record>> oe2 = new
OutputEmitter<Record>(ShipStrategyType.PARTITION_HASH, stringComp);
+ verifyPartitionHashSelectedChannels(10000, 100,
RecordType.STRING);
- numChans = 100;
- numRecs = 10000;
-
- hit = new int[numChans];
-
- for (int i = 0; i < numRecs; i++) {
- StringValue k = new StringValue(i + "");
- Record rec = new Record(k);
- delegate.setInstance(rec);
-
- int[] chans = oe2.selectChannels(delegate, hit.length);
- for (int chan : chans) {
- hit[chan]++;
- }
- }
-
- cnt = 0;
- for (int aHit : hit) {
- assertTrue(aHit > 0);
- cnt += aHit;
- }
- assertTrue(cnt == numRecs);
-
- // test hash corner cases
+ // Test hash corner cases
final TestIntComparator testIntComp = new TestIntComparator();
- final ChannelSelector<SerializationDelegate<Integer>> oe3 = new
OutputEmitter<Integer>(ShipStrategyType.PARTITION_HASH, testIntComp);
- final SerializationDelegate<Integer> intDel = new
SerializationDelegate<Integer>(new IntSerializer());
-
- numChans = 100;
-
- // MinVal hash
- intDel.setInstance(Integer.MIN_VALUE);
- int[] chans = oe3.selectChannels(intDel, numChans);
- assertTrue(chans.length == 1);
- assertTrue(chans[0] >= 0 && chans[0] <= numChans-1);
-
- // -1 hash
- intDel.setInstance(-1);
- chans = oe3.selectChannels(intDel, hit.length);
- assertTrue(chans.length == 1);
- assertTrue(chans[0] >= 0 && chans[0] <= numChans-1);
-
- // 0 hash
- intDel.setInstance(0);
- chans = oe3.selectChannels(intDel, hit.length);
- assertTrue(chans.length == 1);
- assertTrue(chans[0] >= 0 && chans[0] <= numChans-1);
-
- // 1 hash
- intDel.setInstance(1);
- chans = oe3.selectChannels(intDel, hit.length);
- assertTrue(chans.length == 1);
- assertTrue(chans[0] >= 0 && chans[0] <= numChans-1);
-
- // MaxVal hash
- intDel.setInstance(Integer.MAX_VALUE);
- chans = oe3.selectChannels(intDel, hit.length);
- assertTrue(chans.length == 1);
- assertTrue(chans[0] >= 0 && chans[0] <= numChans-1);
+ final ChannelSelector<SerializationDelegate<Integer>> selector
= new OutputEmitter<>(
+ ShipStrategyType.PARTITION_HASH, testIntComp);
+ final SerializationDelegate<Integer> serializationDelegate =
new SerializationDelegate<>(new IntSerializer());
+
+ assertPartitionHashSelectedChannels(selector,
serializationDelegate, Integer.MIN_VALUE, 100);
+ assertPartitionHashSelectedChannels(selector,
serializationDelegate, -1, 100);
+ assertPartitionHashSelectedChannels(selector,
serializationDelegate, 0, 100);
+ assertPartitionHashSelectedChannels(selector,
serializationDelegate, 1, 100);
+ assertPartitionHashSelectedChannels(selector,
serializationDelegate, Integer.MAX_VALUE, 100);
}
@Test
public void testForward() {
- // Test for IntValue
- @SuppressWarnings({"unchecked", "rawtypes"})
- final TypeComparator<Record> intComp = new
RecordComparatorFactory(new int[] {0}, new Class[]
{IntValue.class}).createComparator();
- final ChannelSelector<SerializationDelegate<Record>> oe1 = new
OutputEmitter<Record>(ShipStrategyType.FORWARD, intComp);
- final SerializationDelegate<Record> delegate = new
SerializationDelegate<Record>(new RecordSerializerFactory().getSerializer());
+ final int numChannels = 100;
- int numChannels = 100;
+ // Test for IntValue
int numRecords = 50000 + numChannels / 2;
-
- int[] hit = new int[numChannels];
-
- for (int i = 0; i < numRecords; i++) {
- IntValue k = new IntValue(i);
- Record rec = new Record(k);
- delegate.setInstance(rec);
-
- int[] chans = oe1.selectChannels(delegate, hit.length);
- for (int chan : chans) {
- hit[chan]++;
- }
- }
-
- assertTrue(hit[0] == numRecords);
- for (int i = 1; i < hit.length; i++) {
- assertTrue(hit[i] == 0);
- }
+ verifyForwardSelectedChannels(numRecords, numChannels,
RecordType.INTEGER);
// Test for StringValue
- @SuppressWarnings({"unchecked", "rawtypes"})
- final TypeComparator<Record> stringComp = new
RecordComparatorFactory(new int[] {0}, new Class[]
{StringValue.class}).createComparator();
- final ChannelSelector<SerializationDelegate<Record>> oe2 = new
OutputEmitter<Record>(ShipStrategyType.FORWARD, stringComp);
-
- numChannels = 100;
numRecords = 10000 + numChannels / 2;
-
- hit = new int[numChannels];
-
- for (int i = 0; i < numRecords; i++) {
- StringValue k = new StringValue(i + "");
- Record rec = new Record(k);
- delegate.setInstance(rec);
-
- int[] chans = oe2.selectChannels(delegate, hit.length);
- for (int chan : chans) {
- hit[chan]++;
- }
- }
-
- assertTrue(hit[0] == numRecords);
- for (int i = 1; i < hit.length; i++) {
- assertTrue(hit[i] == 0);
- }
+ verifyForwardSelectedChannels(numRecords, numChannels,
RecordType.STRING);
}
@Test
public void testForcedRebalance() {
- // Test for IntValue
- int numChannels = 100;
- int toTaskIndex = numChannels * 6/7;
+ final int numChannels = 100;
+ int toTaskIndex = numChannels * 6 / 7;
int fromTaskIndex = toTaskIndex + numChannels;
int extraRecords = numChannels / 3;
int numRecords = 50000 + extraRecords;
+ final SerializationDelegate<Record> delegate = new
SerializationDelegate<>(
+ new RecordSerializerFactory().getSerializer());
+ final ChannelSelector<SerializationDelegate<Record>> selector =
new OutputEmitter<>(
+ ShipStrategyType.PARTITION_FORCED_REBALANCE,
fromTaskIndex);
- final ChannelSelector<SerializationDelegate<Record>> oe1 = new
OutputEmitter<Record>(ShipStrategyType.PARTITION_FORCED_REBALANCE,
fromTaskIndex);
- final SerializationDelegate<Record> delegate = new
SerializationDelegate<Record>(new RecordSerializerFactory().getSerializer());
-
- int[] hit = new int[numChannels];
-
- for (int i = 0; i < numRecords; i++) {
- IntValue k = new IntValue(i);
- Record rec = new Record(k);
- delegate.setInstance(rec);
-
- int[] chans = oe1.selectChannels(delegate, hit.length);
- for (int chan : chans) {
- hit[chan]++;
- }
- }
-
- int cnt = 0;
- for (int i = 0; i < hit.length; i++) {
- if (toTaskIndex <= i || i <
toTaskIndex+extraRecords-numChannels) {
- assertTrue(hit[i] ==
(numRecords/numChannels)+1);
+ // Test for IntValue
+ int[] hits = getSelectedChannelsHitCount(selector, delegate,
RecordType.INTEGER, numRecords, numChannels);
+ int totalHitCount = 0;
+ for (int i = 0; i < hits.length; i++) {
+ if (toTaskIndex <= i || i < toTaskIndex+extraRecords -
numChannels) {
+ assertTrue(hits[i] == (numRecords /
numChannels) + 1);
} else {
- assertTrue(hit[i] == numRecords/numChannels);
+ assertTrue(hits[i] == numRecords/numChannels);
}
- cnt += hit[i];
+ totalHitCount += hits[i];
}
- assertTrue(cnt == numRecords);
+ assertTrue(totalHitCount == numRecords);
- // Test for StringValue
- numChannels = 100;
toTaskIndex = numChannels / 5;
fromTaskIndex = toTaskIndex + 2 * numChannels;
- extraRecords = numChannels * 2/9;
+ extraRecords = numChannels * 2 / 9;
numRecords = 10000 + extraRecords;
- final ChannelSelector<SerializationDelegate<Record>> oe2 = new
OutputEmitter<Record>(ShipStrategyType.PARTITION_FORCED_REBALANCE,
fromTaskIndex);
-
- hit = new int[numChannels];
-
- for (int i = 0; i < numRecords; i++) {
- StringValue k = new StringValue(i + "");
- Record rec = new Record(k);
- delegate.setInstance(rec);
-
- int[] chans = oe2.selectChannels(delegate, hit.length);
- for (int chan : chans) {
- hit[chan]++;
- }
- }
-
- cnt = 0;
- for (int i = 0; i < hit.length; i++) {
- if (toTaskIndex <= i && i < toTaskIndex+extraRecords) {
- assertTrue(hit[i] ==
(numRecords/numChannels)+1);
+ // Test for StringValue
+ final ChannelSelector<SerializationDelegate<Record>> selector2
= new OutputEmitter<>(
+ ShipStrategyType.PARTITION_FORCED_REBALANCE,
fromTaskIndex);
+ hits = getSelectedChannelsHitCount(selector2, delegate,
RecordType.STRING, numRecords, numChannels);
+ totalHitCount = 0;
+ for (int i = 0; i < hits.length; i++) {
+ if (toTaskIndex <= i && i < toTaskIndex + extraRecords)
{
+ assertTrue(hits[i] == (numRecords /
numChannels) + 1);
} else {
- assertTrue(hit[i] == numRecords/numChannels);
+ assertTrue(hits[i] == numRecords / numChannels);
}
- cnt += hit[i];
+ totalHitCount += hits[i];
}
- assertTrue(cnt == numRecords);
+ assertTrue(totalHitCount == numRecords);
}
@Test
public void testBroadcast() {
// Test for IntValue
- @SuppressWarnings({"unchecked", "rawtypes"})
- final TypeComparator<Record> intComp = new
RecordComparatorFactory(new int[] {0}, new Class[]
{IntValue.class}).createComparator();
- final ChannelSelector<SerializationDelegate<Record>> oe1 = new
OutputEmitter<Record>(ShipStrategyType.BROADCAST, intComp);
- final SerializationDelegate<Record> delegate = new
SerializationDelegate<Record>(new RecordSerializerFactory().getSerializer());
-
- int numChannels = 100;
- int numRecords = 50000;
-
- int[] hit = new int[numChannels];
-
- for (int i = 0; i < numRecords; i++) {
- IntValue k = new IntValue(i);
- Record rec = new Record(k);
- delegate.setInstance(rec);
-
- int[] chans = oe1.selectChannels(delegate, hit.length);
- for (int chan : chans) {
- hit[chan]++;
- }
- }
-
- for (int aHit : hit) {
- assertTrue(aHit + "", aHit == numRecords);
- }
-
+ verifyBroadcastSelectedChannels(100, 50000, RecordType.INTEGER);
// Test for StringValue
- @SuppressWarnings({"unchecked", "rawtypes"})
- final TypeComparator<Record> stringComp = new
RecordComparatorFactory(new int[] {0}, new Class[]
{StringValue.class}).createComparator();
- final ChannelSelector<SerializationDelegate<Record>> oe2 = new
OutputEmitter<Record>(ShipStrategyType.BROADCAST, stringComp);
-
- numChannels = 100;
- numRecords = 5000;
-
- hit = new int[numChannels];
-
- for (int i = 0; i < numRecords; i++) {
- StringValue k = new StringValue(i + "");
- Record rec = new Record(k);
- delegate.setInstance(rec);
-
- int[] chans = oe2.selectChannels(delegate, hit.length);
- for (int chan : chans) {
- hit[chan]++;
- }
- }
-
- for (int aHit : hit) {
- assertTrue(aHit + "", aHit == numRecords);
- }
+ verifyBroadcastSelectedChannels(100, 50000, RecordType.STRING);
}
@Test
public void testMultiKeys() {
- @SuppressWarnings({"unchecked", "rawtypes"})
- final TypeComparator<Record> multiComp = new
RecordComparatorFactory(new int[] {0,1,3}, new Class[] {IntValue.class,
StringValue.class, DoubleValue.class}).createComparator();
- final ChannelSelector<SerializationDelegate<Record>> oe1 = new
OutputEmitter<Record>(ShipStrategyType.PARTITION_HASH, multiComp);
- final SerializationDelegate<Record> delegate = new
SerializationDelegate<Record>(new RecordSerializerFactory().getSerializer());
+ final TypeComparator<Record> multiComp = new
RecordComparatorFactory(
+ new int[] {0,1, 3}, new Class[] {IntValue.class,
StringValue.class, DoubleValue.class}).createComparator();
+ final ChannelSelector<SerializationDelegate<Record>> selector =
new OutputEmitter<>(
+ ShipStrategyType.PARTITION_HASH, multiComp);
+ final SerializationDelegate<Record> delegate = new
SerializationDelegate<>(new RecordSerializerFactory().getSerializer());
int numChannels = 100;
int numRecords = 5000;
-
- int[] hit = new int[numChannels];
-
+ int[] hits = new int[numChannels];
for (int i = 0; i < numRecords; i++) {
- Record rec = new Record(4);
- rec.setField(0, new IntValue(i));
- rec.setField(1, new StringValue("AB"+i+"CD"+i));
- rec.setField(3, new DoubleValue(i*3.141d));
- delegate.setInstance(rec);
-
- int[] chans = oe1.selectChannels(delegate, hit.length);
- for (int chan : chans) {
- hit[chan]++;
+ Record record = new Record(4);
+ record.setField(0, new IntValue(i));
+ record.setField(1, new StringValue("AB" + i + "CD" +
i));
+ record.setField(3, new DoubleValue(i * 3.141d));
+ delegate.setInstance(record);
+
+ int[] channels = selector.selectChannels(delegate,
hits.length);
+ for (int channel : channels) {
+ hits[channel]++;
}
}
- int cnt = 0;
- for (int aHit : hit) {
- assertTrue(aHit > 0);
- cnt += aHit;
+ int totalHitCount = 0;
+ for (int hit : hits) {
+ assertTrue(hit > 0);
+ totalHitCount += hit;
}
- assertTrue(cnt == numRecords);
-
+ assertTrue(totalHitCount == numRecords);
}
@Test
public void testMissingKey() {
- // Test for IntValue
- @SuppressWarnings({"unchecked", "rawtypes"})
- final TypeComparator<Record> intComp = new
RecordComparatorFactory(new int[] {1}, new Class[]
{IntValue.class}).createComparator();
- final ChannelSelector<SerializationDelegate<Record>> oe1 = new
OutputEmitter<Record>(ShipStrategyType.PARTITION_HASH, intComp);
- final SerializationDelegate<Record> delegate = new
SerializationDelegate<Record>(new RecordSerializerFactory().getSerializer());
-
- Record rec = new Record(0);
- rec.setField(0, new IntValue(1));
- delegate.setInstance(rec);
-
- try {
- oe1.selectChannels(delegate, 100);
- } catch (KeyFieldOutOfBoundsException re) {
- Assert.assertEquals(1, re.getFieldNumber());
- return;
+ if (!verifyWrongPartitionHashKey(1, 0)) {
+ Assert.fail("Expected a KeyFieldOutOfBoundsException.");
}
- Assert.fail("Expected a KeyFieldOutOfBoundsException.");
}
@Test
public void testNullKey() {
- // Test for IntValue
- @SuppressWarnings({"unchecked", "rawtypes"})
- final TypeComparator<Record> intComp = new
RecordComparatorFactory(new int[] {0}, new Class[]
{IntValue.class}).createComparator();
- final ChannelSelector<SerializationDelegate<Record>> oe1 = new
OutputEmitter<Record>(ShipStrategyType.PARTITION_HASH, intComp);
- final SerializationDelegate<Record> delegate = new
SerializationDelegate<Record>(new RecordSerializerFactory().getSerializer());
-
- Record rec = new Record(2);
- rec.setField(1, new IntValue(1));
- delegate.setInstance(rec);
-
- try {
- oe1.selectChannels(delegate, 100);
- } catch (NullKeyFieldException re) {
- Assert.assertEquals(0, re.getFieldNumber());
- return;
+ if (!verifyWrongPartitionHashKey(0, 1)) {
+ Assert.fail("Expected a NullKeyFieldException.");
}
- Assert.fail("Expected a NullKeyFieldException.");
}
@Test
- public void testWrongKeyClass() {
-
+ public void testWrongKeyClass() throws Exception {
// Test for IntValue
- @SuppressWarnings({"unchecked", "rawtypes"})
- final TypeComparator<Record> doubleComp = new
RecordComparatorFactory(new int[] {0}, new Class[]
{DoubleValue.class}).createComparator();
- final ChannelSelector<SerializationDelegate<Record>> oe1 = new
OutputEmitter<Record>(ShipStrategyType.PARTITION_HASH, doubleComp);
- final SerializationDelegate<Record> delegate = new
SerializationDelegate<Record>(new RecordSerializerFactory().getSerializer());
-
-
- ;
-
- Record rec = null;
-
- try {
- PipedInputStream pipedInput = new
PipedInputStream(1024*1024);
-
- DataInputView in = new
DataInputViewStreamWrapper(pipedInput);
- DataOutputView out = new
DataOutputViewStreamWrapper(new PipedOutputStream(pipedInput));
-
- rec = new Record(1);
- rec.setField(0, new IntValue());
-
- rec.write(out);
- rec = new Record();
- rec.read(in);
- } catch (IOException e) {
- fail("Test erroneous");
- }
+ final TypeComparator<Record> doubleComp = new
RecordComparatorFactory(
+ new int[] {0}, new Class[]
{DoubleValue.class}).createComparator();
+ final ChannelSelector<SerializationDelegate<Record>> selector =
new OutputEmitter<>(
+ ShipStrategyType.PARTITION_HASH, doubleComp);
+ final SerializationDelegate<Record> delegate = new
SerializationDelegate<>(new RecordSerializerFactory().getSerializer());
+
+ PipedInputStream pipedInput = new PipedInputStream(1024 * 1024);
+ DataInputView in = new DataInputViewStreamWrapper(pipedInput);
+ DataOutputView out = new DataOutputViewStreamWrapper(new
PipedOutputStream(pipedInput));
+
+ Record record = new Record(1);
+ record.setField(0, new IntValue());
+ record.write(out);
+ record = new Record();
+ record.read(in);
try {
- delegate.setInstance(rec);
- oe1.selectChannels(delegate, 100);
+ delegate.setInstance(record);
+ selector.selectChannels(delegate, 100);
} catch (DeserializationException re) {
return;
}
Assert.fail("Expected a NullKeyFieldException.");
}
-
- @SuppressWarnings({"serial", "rawtypes"})
+
+ private void verifyPartitionHashSelectedChannels(int numRecords, int
numChannels, Enum recordType) {
+ int[] hits =
getSelectedChannelsHitCount(ShipStrategyType.PARTITION_HASH, numRecords,
numChannels, recordType);
+
+ int totalHitCount = 0;
+ for (int hit : hits) {
+ assertTrue(hit > 0);
+ totalHitCount += hit;
+ }
+ assertTrue(totalHitCount == numRecords);
+ }
+
+ private void assertPartitionHashSelectedChannels(
+ ChannelSelector selector,
+ SerializationDelegate<Integer> serializationDelegate,
+ int record,
+ int numChannels) {
+ serializationDelegate.setInstance(record);
+ int[] selectedChannels =
selector.selectChannels(serializationDelegate, numChannels);
+
+ assertTrue(selectedChannels.length == 1);
+ assertTrue(selectedChannels[0] >= 0 && selectedChannels[0] <=
numChannels - 1);
+ }
+
+ private void verifyForwardSelectedChannels(int numRecords, int
numChannels, Enum recordType) {
+ int[] hits =
getSelectedChannelsHitCount(ShipStrategyType.FORWARD, numRecords, numChannels,
recordType);
+
+ assertTrue(hits[0] == numRecords);
+ for (int i = 1; i < hits.length; i++) {
+ assertTrue(hits[i] == 0);
+ }
+ }
+
+ private void verifyBroadcastSelectedChannels(int numRecords, int
numChannels, Enum recordType) {
+ int[] hits =
getSelectedChannelsHitCount(ShipStrategyType.BROADCAST, numRecords,
numChannels, recordType);
+
+ for (int hit : hits) {
+ assertTrue(hit + "", hit == numRecords);
+ }
+ }
+
+ private boolean verifyWrongPartitionHashKey(int position, int fieldNum)
{
+ final TypeComparator<Record> comparator = new
RecordComparatorFactory(
+ new int[] {position}, new Class[]
{IntValue.class}).createComparator();
+ final ChannelSelector<SerializationDelegate<Record>> selector =
new OutputEmitter<>(
+ ShipStrategyType.PARTITION_HASH, comparator);
+ final SerializationDelegate<Record> delegate = new
SerializationDelegate<>(new RecordSerializerFactory().getSerializer());
+
+ Record record = new Record(2);
+ record.setField(fieldNum, new IntValue(1));
+ delegate.setInstance(record);
+
+ try {
+ selector.selectChannels(delegate, 100);
+ } catch (NullKeyFieldException re) {
+ Assert.assertEquals(position, re.getFieldNumber());
+ return true;
+ }
+ return false;
+ }
+
+ private int[] getSelectedChannelsHitCount(
+ ShipStrategyType shipStrategyType,
+ int numRecords,
+ int numChannels,
+ Enum recordType) {
+ final TypeComparator<Record> comparator = new
RecordComparatorFactory(
+ new int[] {0}, new Class[] {recordType ==
RecordType.INTEGER ? IntValue.class : StringValue.class}).createComparator();
+ final ChannelSelector<SerializationDelegate<Record>> selector =
new OutputEmitter<>(shipStrategyType, comparator);
+ final SerializationDelegate<Record> delegate = new
SerializationDelegate<>(new RecordSerializerFactory().getSerializer());
+
+ return getSelectedChannelsHitCount(selector, delegate,
recordType, numRecords, numChannels);
+ }
+
+ private int[] getSelectedChannelsHitCount(
+ ChannelSelector<SerializationDelegate<Record>> selector,
+ SerializationDelegate<Record> delegate,
+ Enum recordType,
+ int numRecords,
+ int numChannels) {
+ int[] hits = new int[numChannels];
+ Value value;
+ for (int i = 0; i < numRecords; i++) {
+ if (recordType == RecordType.INTEGER) {
+ value = new IntValue(i);
+ } else {
+ value = new StringValue(i + "");
+ }
+ Record record = new Record(value);
+ delegate.setInstance(record);
+
+ int[] channels = selector.selectChannels(delegate,
hits.length);
+ for (int channel : channels) {
+ hits[channel]++;
+ }
+ }
+ return hits;
+ }
+
private static class TestIntComparator extends TypeComparator<Integer> {
private TypeComparator[] comparators = new TypeComparator[]{new
IntComparator(true)};
@@ -517,4 +382,9 @@ public int extractKeys(Object record, Object[] target, int
index) {
return comparators;
}
}
+
+ private enum RecordType {
+ STRING,
+ INTEGER
+ }
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services