[
https://issues.apache.org/jira/browse/FLINK-10942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16695676#comment-16695676
]
ASF GitHub Bot commented on FLINK-10942:
----------------------------------------
pnowojski commented on a change in pull request #7146:
[FLINK-10942][network,test] Deduplicate common codes in OutputEmitterTest
URL: https://github.com/apache/flink/pull/7146#discussion_r235636425
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/OutputEmitterTest.java
##########
@@ -48,404 +49,272 @@
import java.io.PipedOutputStream;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+@SuppressWarnings({"unchecked", "rawtypes"})
public class OutputEmitterTest {
-
-
+
@Test
public void testPartitionHash() {
// Test for IntValue
- @SuppressWarnings({"unchecked", "rawtypes"})
- final TypeComparator<Record> intComp = new
RecordComparatorFactory(new int[] {0}, new Class[]
{IntValue.class}).createComparator();
- final ChannelSelector<SerializationDelegate<Record>> oe1 = new
OutputEmitter<Record>(ShipStrategyType.PARTITION_HASH, intComp);
- final SerializationDelegate<Record> delegate = new
SerializationDelegate<Record>(new RecordSerializerFactory().getSerializer());
-
- int numChans = 100;
- int numRecs = 50000;
- int[] hit = new int[numChans];
-
- for (int i = 0; i < numRecs; i++) {
- IntValue k = new IntValue(i);
- Record rec = new Record(k);
-
- delegate.setInstance(rec);
-
- int[] chans = oe1.selectChannels(delegate, hit.length);
- for (int chan : chans) {
- hit[chan]++;
- }
- }
-
- int cnt = 0;
- for (int aHit : hit) {
- assertTrue(aHit > 0);
- cnt += aHit;
- }
- assertTrue(cnt == numRecs);
-
+ verifyPartitionHashSelectedChannels(50000, 100, new
Either.Left<>(0));
// Test for StringValue
- @SuppressWarnings({"unchecked", "rawtypes"})
- final TypeComparator<Record> stringComp = new
RecordComparatorFactory(new int[] {0}, new Class[]
{StringValue.class}).createComparator();
- final ChannelSelector<SerializationDelegate<Record>> oe2 = new
OutputEmitter<Record>(ShipStrategyType.PARTITION_HASH, stringComp);
-
- numChans = 100;
- numRecs = 10000;
-
- hit = new int[numChans];
-
- for (int i = 0; i < numRecs; i++) {
- StringValue k = new StringValue(i + "");
- Record rec = new Record(k);
- delegate.setInstance(rec);
-
- int[] chans = oe2.selectChannels(delegate, hit.length);
- for (int chan : chans) {
- hit[chan]++;
- }
- }
+ verifyPartitionHashSelectedChannels(10000, 100, new
Either.Right<>(""));
- cnt = 0;
- for (int aHit : hit) {
- assertTrue(aHit > 0);
- cnt += aHit;
- }
- assertTrue(cnt == numRecs);
-
- // test hash corner cases
+ // Test hash corner cases
final TestIntComparator testIntComp = new TestIntComparator();
- final ChannelSelector<SerializationDelegate<Integer>> oe3 = new
OutputEmitter<Integer>(ShipStrategyType.PARTITION_HASH, testIntComp);
- final SerializationDelegate<Integer> intDel = new
SerializationDelegate<Integer>(new IntSerializer());
-
- numChans = 100;
-
+ final ChannelSelector<SerializationDelegate<Integer>> selector
= new OutputEmitter<>(
+ ShipStrategyType.PARTITION_HASH, testIntComp);
+ final SerializationDelegate<Integer> serializationDelegate =
new SerializationDelegate<>(new IntSerializer());
+
// MinVal hash
- intDel.setInstance(Integer.MIN_VALUE);
- int[] chans = oe3.selectChannels(intDel, numChans);
- assertTrue(chans.length == 1);
- assertTrue(chans[0] >= 0 && chans[0] <= numChans-1);
-
+ assertPartitionHashSelectedChannels(selector,
serializationDelegate, Integer.MIN_VALUE, 100);
// -1 hash
- intDel.setInstance(-1);
- chans = oe3.selectChannels(intDel, hit.length);
- assertTrue(chans.length == 1);
- assertTrue(chans[0] >= 0 && chans[0] <= numChans-1);
-
+ assertPartitionHashSelectedChannels(selector,
serializationDelegate, -1, 100);
// 0 hash
- intDel.setInstance(0);
- chans = oe3.selectChannels(intDel, hit.length);
- assertTrue(chans.length == 1);
- assertTrue(chans[0] >= 0 && chans[0] <= numChans-1);
-
+ assertPartitionHashSelectedChannels(selector,
serializationDelegate, 0, 100);
// 1 hash
- intDel.setInstance(1);
- chans = oe3.selectChannels(intDel, hit.length);
- assertTrue(chans.length == 1);
- assertTrue(chans[0] >= 0 && chans[0] <= numChans-1);
-
+ assertPartitionHashSelectedChannels(selector,
serializationDelegate, 1, 100);
// MaxVal hash
- intDel.setInstance(Integer.MAX_VALUE);
- chans = oe3.selectChannels(intDel, hit.length);
- assertTrue(chans.length == 1);
- assertTrue(chans[0] >= 0 && chans[0] <= numChans-1);
+ assertPartitionHashSelectedChannels(selector,
serializationDelegate, Integer.MAX_VALUE, 100);
}
@Test
public void testForward() {
- // Test for IntValue
- @SuppressWarnings({"unchecked", "rawtypes"})
- final TypeComparator<Record> intComp = new
RecordComparatorFactory(new int[] {0}, new Class[]
{IntValue.class}).createComparator();
- final ChannelSelector<SerializationDelegate<Record>> oe1 = new
OutputEmitter<Record>(ShipStrategyType.FORWARD, intComp);
- final SerializationDelegate<Record> delegate = new
SerializationDelegate<Record>(new RecordSerializerFactory().getSerializer());
+ final int numChannels = 100;
- int numChannels = 100;
+ // Test for IntValue
int numRecords = 50000 + numChannels / 2;
-
- int[] hit = new int[numChannels];
-
- for (int i = 0; i < numRecords; i++) {
- IntValue k = new IntValue(i);
- Record rec = new Record(k);
- delegate.setInstance(rec);
-
- int[] chans = oe1.selectChannels(delegate, hit.length);
- for (int chan : chans) {
- hit[chan]++;
- }
- }
-
- assertTrue(hit[0] == numRecords);
- for (int i = 1; i < hit.length; i++) {
- assertTrue(hit[i] == 0);
- }
+ verifyForwardSelectedChannels(numRecords, numChannels, new
Either.Left<>(0));
// Test for StringValue
- @SuppressWarnings({"unchecked", "rawtypes"})
- final TypeComparator<Record> stringComp = new
RecordComparatorFactory(new int[] {0}, new Class[]
{StringValue.class}).createComparator();
- final ChannelSelector<SerializationDelegate<Record>> oe2 = new
OutputEmitter<Record>(ShipStrategyType.FORWARD, stringComp);
-
- numChannels = 100;
numRecords = 10000 + numChannels / 2;
-
- hit = new int[numChannels];
-
- for (int i = 0; i < numRecords; i++) {
- StringValue k = new StringValue(i + "");
- Record rec = new Record(k);
- delegate.setInstance(rec);
-
- int[] chans = oe2.selectChannels(delegate, hit.length);
- for (int chan : chans) {
- hit[chan]++;
- }
- }
-
- assertTrue(hit[0] == numRecords);
- for (int i = 1; i < hit.length; i++) {
- assertTrue(hit[i] == 0);
- }
+ verifyForwardSelectedChannels(numRecords, numChannels, new
Either.Right<>(""));
}
@Test
public void testForcedRebalance() {
- // Test for IntValue
- int numChannels = 100;
- int toTaskIndex = numChannels * 6/7;
+ final int numChannels = 100;
+ int toTaskIndex = numChannels * 6 / 7;
int fromTaskIndex = toTaskIndex + numChannels;
int extraRecords = numChannels / 3;
int numRecords = 50000 + extraRecords;
+ final SerializationDelegate<Record> delegate = new
SerializationDelegate<>(
+ new RecordSerializerFactory().getSerializer());
+ final ChannelSelector<SerializationDelegate<Record>> selector =
new OutputEmitter<>(
+ ShipStrategyType.PARTITION_FORCED_REBALANCE,
fromTaskIndex);
- final ChannelSelector<SerializationDelegate<Record>> oe1 = new
OutputEmitter<Record>(ShipStrategyType.PARTITION_FORCED_REBALANCE,
fromTaskIndex);
- final SerializationDelegate<Record> delegate = new
SerializationDelegate<Record>(new RecordSerializerFactory().getSerializer());
-
- int[] hit = new int[numChannels];
-
- for (int i = 0; i < numRecords; i++) {
- IntValue k = new IntValue(i);
- Record rec = new Record(k);
- delegate.setInstance(rec);
-
- int[] chans = oe1.selectChannels(delegate, hit.length);
- for (int chan : chans) {
- hit[chan]++;
- }
- }
-
+ // Test for IntValue
+ int[] hits = getSelectedChannels(selector, delegate, new
Either.Left<>(0), numRecords, numChannels);
int cnt = 0;
- for (int i = 0; i < hit.length; i++) {
- if (toTaskIndex <= i || i <
toTaskIndex+extraRecords-numChannels) {
- assertTrue(hit[i] ==
(numRecords/numChannels)+1);
+ for (int i = 0; i < hits.length; i++) {
+ if (toTaskIndex <= i || i < toTaskIndex+extraRecords -
numChannels) {
+ assertTrue(hits[i] == (numRecords /
numChannels) + 1);
} else {
- assertTrue(hit[i] == numRecords/numChannels);
+ assertTrue(hits[i] == numRecords/numChannels);
}
- cnt += hit[i];
+ cnt += hits[i];
}
assertTrue(cnt == numRecords);
- // Test for StringValue
- numChannels = 100;
toTaskIndex = numChannels / 5;
fromTaskIndex = toTaskIndex + 2 * numChannels;
- extraRecords = numChannels * 2/9;
+ extraRecords = numChannels * 2 / 9;
numRecords = 10000 + extraRecords;
- final ChannelSelector<SerializationDelegate<Record>> oe2 = new
OutputEmitter<Record>(ShipStrategyType.PARTITION_FORCED_REBALANCE,
fromTaskIndex);
-
- hit = new int[numChannels];
-
- for (int i = 0; i < numRecords; i++) {
- StringValue k = new StringValue(i + "");
- Record rec = new Record(k);
- delegate.setInstance(rec);
-
- int[] chans = oe2.selectChannels(delegate, hit.length);
- for (int chan : chans) {
- hit[chan]++;
- }
- }
-
+ // Test for StringValue
+ final ChannelSelector<SerializationDelegate<Record>> selector2
= new OutputEmitter<>(
+ ShipStrategyType.PARTITION_FORCED_REBALANCE,
fromTaskIndex);
+ hits = getSelectedChannels(selector2, delegate, new
Either.Right<>(""), numRecords, numChannels);
cnt = 0;
- for (int i = 0; i < hit.length; i++) {
- if (toTaskIndex <= i && i < toTaskIndex+extraRecords) {
- assertTrue(hit[i] ==
(numRecords/numChannels)+1);
+ for (int i = 0; i < hits.length; i++) {
+ if (toTaskIndex <= i && i < toTaskIndex + extraRecords)
{
+ assertTrue(hits[i] == (numRecords /
numChannels) + 1);
} else {
- assertTrue(hit[i] == numRecords/numChannels);
+ assertTrue(hits[i] == numRecords / numChannels);
}
- cnt += hit[i];
+ cnt += hits[i];
}
assertTrue(cnt == numRecords);
}
@Test
public void testBroadcast() {
// Test for IntValue
- @SuppressWarnings({"unchecked", "rawtypes"})
- final TypeComparator<Record> intComp = new
RecordComparatorFactory(new int[] {0}, new Class[]
{IntValue.class}).createComparator();
- final ChannelSelector<SerializationDelegate<Record>> oe1 = new
OutputEmitter<Record>(ShipStrategyType.BROADCAST, intComp);
- final SerializationDelegate<Record> delegate = new
SerializationDelegate<Record>(new RecordSerializerFactory().getSerializer());
-
- int numChannels = 100;
- int numRecords = 50000;
-
- int[] hit = new int[numChannels];
-
- for (int i = 0; i < numRecords; i++) {
- IntValue k = new IntValue(i);
- Record rec = new Record(k);
- delegate.setInstance(rec);
-
- int[] chans = oe1.selectChannels(delegate, hit.length);
- for (int chan : chans) {
- hit[chan]++;
- }
- }
-
- for (int aHit : hit) {
- assertTrue(aHit + "", aHit == numRecords);
- }
-
+ verifyBroadcastSelectedChannels(100, 50000, new
Either.Left<>(0));
// Test for StringValue
- @SuppressWarnings({"unchecked", "rawtypes"})
- final TypeComparator<Record> stringComp = new
RecordComparatorFactory(new int[] {0}, new Class[]
{StringValue.class}).createComparator();
- final ChannelSelector<SerializationDelegate<Record>> oe2 = new
OutputEmitter<Record>(ShipStrategyType.BROADCAST, stringComp);
-
- numChannels = 100;
- numRecords = 5000;
-
- hit = new int[numChannels];
-
- for (int i = 0; i < numRecords; i++) {
- StringValue k = new StringValue(i + "");
- Record rec = new Record(k);
- delegate.setInstance(rec);
-
- int[] chans = oe2.selectChannels(delegate, hit.length);
- for (int chan : chans) {
- hit[chan]++;
- }
- }
-
- for (int aHit : hit) {
- assertTrue(aHit + "", aHit == numRecords);
- }
+ verifyBroadcastSelectedChannels(100, 50000, new
Either.Right<>(""));
}
@Test
public void testMultiKeys() {
- @SuppressWarnings({"unchecked", "rawtypes"})
- final TypeComparator<Record> multiComp = new
RecordComparatorFactory(new int[] {0,1,3}, new Class[] {IntValue.class,
StringValue.class, DoubleValue.class}).createComparator();
- final ChannelSelector<SerializationDelegate<Record>> oe1 = new
OutputEmitter<Record>(ShipStrategyType.PARTITION_HASH, multiComp);
- final SerializationDelegate<Record> delegate = new
SerializationDelegate<Record>(new RecordSerializerFactory().getSerializer());
+ final TypeComparator<Record> multiComp = new
RecordComparatorFactory(
+ new int[] {0,1, 3}, new Class[] {IntValue.class,
StringValue.class, DoubleValue.class}).createComparator();
+ final ChannelSelector<SerializationDelegate<Record>> selector =
new OutputEmitter<>(ShipStrategyType.PARTITION_HASH, multiComp);
+ final SerializationDelegate<Record> delegate = new
SerializationDelegate<>(new RecordSerializerFactory().getSerializer());
int numChannels = 100;
int numRecords = 5000;
-
- int[] hit = new int[numChannels];
-
+ int[] hits = new int[numChannels];
for (int i = 0; i < numRecords; i++) {
- Record rec = new Record(4);
- rec.setField(0, new IntValue(i));
- rec.setField(1, new StringValue("AB"+i+"CD"+i));
- rec.setField(3, new DoubleValue(i*3.141d));
- delegate.setInstance(rec);
-
- int[] chans = oe1.selectChannels(delegate, hit.length);
- for (int chan : chans) {
- hit[chan]++;
+ Record record = new Record(4);
+ record.setField(0, new IntValue(i));
+ record.setField(1, new StringValue("AB" + i + "CD" +
i));
+ record.setField(3, new DoubleValue(i * 3.141d));
+ delegate.setInstance(record);
+
+ int[] channels = selector.selectChannels(delegate,
hits.length);
+ for (int channel : channels) {
+ hits[channel]++;
}
}
int cnt = 0;
- for (int aHit : hit) {
- assertTrue(aHit > 0);
- cnt += aHit;
+ for (int hit : hits) {
+ assertTrue(hit > 0);
+ cnt += hit;
}
assertTrue(cnt == numRecords);
-
}
@Test
public void testMissingKey() {
- // Test for IntValue
- @SuppressWarnings({"unchecked", "rawtypes"})
- final TypeComparator<Record> intComp = new
RecordComparatorFactory(new int[] {1}, new Class[]
{IntValue.class}).createComparator();
- final ChannelSelector<SerializationDelegate<Record>> oe1 = new
OutputEmitter<Record>(ShipStrategyType.PARTITION_HASH, intComp);
- final SerializationDelegate<Record> delegate = new
SerializationDelegate<Record>(new RecordSerializerFactory().getSerializer());
-
- Record rec = new Record(0);
- rec.setField(0, new IntValue(1));
- delegate.setInstance(rec);
-
- try {
- oe1.selectChannels(delegate, 100);
- } catch (KeyFieldOutOfBoundsException re) {
- Assert.assertEquals(1, re.getFieldNumber());
- return;
+ if (!verifyWrongPartitionHashKey(1, 0)) {
+ Assert.fail("Expected a KeyFieldOutOfBoundsException.");
}
- Assert.fail("Expected a KeyFieldOutOfBoundsException.");
}
@Test
public void testNullKey() {
- // Test for IntValue
- @SuppressWarnings({"unchecked", "rawtypes"})
- final TypeComparator<Record> intComp = new
RecordComparatorFactory(new int[] {0}, new Class[]
{IntValue.class}).createComparator();
- final ChannelSelector<SerializationDelegate<Record>> oe1 = new
OutputEmitter<Record>(ShipStrategyType.PARTITION_HASH, intComp);
- final SerializationDelegate<Record> delegate = new
SerializationDelegate<Record>(new RecordSerializerFactory().getSerializer());
-
- Record rec = new Record(2);
- rec.setField(1, new IntValue(1));
- delegate.setInstance(rec);
-
- try {
- oe1.selectChannels(delegate, 100);
- } catch (NullKeyFieldException re) {
- Assert.assertEquals(0, re.getFieldNumber());
- return;
+ if (!verifyWrongPartitionHashKey(0, 1)) {
+ Assert.fail("Expected a NullKeyFieldException.");
}
- Assert.fail("Expected a NullKeyFieldException.");
}
@Test
- public void testWrongKeyClass() {
-
+ public void testWrongKeyClass() throws Exception {
// Test for IntValue
- @SuppressWarnings({"unchecked", "rawtypes"})
- final TypeComparator<Record> doubleComp = new
RecordComparatorFactory(new int[] {0}, new Class[]
{DoubleValue.class}).createComparator();
- final ChannelSelector<SerializationDelegate<Record>> oe1 = new
OutputEmitter<Record>(ShipStrategyType.PARTITION_HASH, doubleComp);
- final SerializationDelegate<Record> delegate = new
SerializationDelegate<Record>(new RecordSerializerFactory().getSerializer());
-
-
- ;
-
- Record rec = null;
-
- try {
- PipedInputStream pipedInput = new
PipedInputStream(1024*1024);
-
- DataInputView in = new
DataInputViewStreamWrapper(pipedInput);
- DataOutputView out = new
DataOutputViewStreamWrapper(new PipedOutputStream(pipedInput));
-
- rec = new Record(1);
- rec.setField(0, new IntValue());
-
- rec.write(out);
- rec = new Record();
- rec.read(in);
- } catch (IOException e) {
- fail("Test erroneous");
- }
+ final TypeComparator<Record> doubleComp = new
RecordComparatorFactory(
+ new int[] {0}, new Class[]
{DoubleValue.class}).createComparator();
+ final ChannelSelector<SerializationDelegate<Record>> selector =
new OutputEmitter<>(
+ ShipStrategyType.PARTITION_HASH, doubleComp);
+ final SerializationDelegate<Record> delegate = new
SerializationDelegate<>(new RecordSerializerFactory().getSerializer());
+
+ PipedInputStream pipedInput = new PipedInputStream(1024 * 1024);
+ DataInputView in = new DataInputViewStreamWrapper(pipedInput);
+ DataOutputView out = new DataOutputViewStreamWrapper(new
PipedOutputStream(pipedInput));
+
+ Record record = new Record(1);
+ record.setField(0, new IntValue());
+ record.write(out);
+ record = new Record();
+ record.read(in);
try {
- delegate.setInstance(rec);
- oe1.selectChannels(delegate, 100);
+ delegate.setInstance(record);
+ selector.selectChannels(delegate, 100);
} catch (DeserializationException re) {
return;
}
Assert.fail("Expected a NullKeyFieldException.");
}
-
- @SuppressWarnings({"serial", "rawtypes"})
+
+ private boolean verifyWrongPartitionHashKey(int position, int fieldNum)
{
+ final TypeComparator<Record> comparator = new
RecordComparatorFactory(
+ new int[] {position}, new Class[]
{IntValue.class}).createComparator();
+ final ChannelSelector<SerializationDelegate<Record>> selector =
new OutputEmitter<>(
+ ShipStrategyType.PARTITION_HASH, comparator);
+ final SerializationDelegate<Record> delegate = new
SerializationDelegate<>(new RecordSerializerFactory().getSerializer());
+
+ Record record = new Record(2);
+ record.setField(fieldNum, new IntValue(1));
+ delegate.setInstance(record);
+
+ try {
+ selector.selectChannels(delegate, 100);
+ } catch (NullKeyFieldException re) {
+ Assert.assertEquals(position, re.getFieldNumber());
+ return true;
+ }
+ return false;
+ }
+
+ private int[] getSelectedChannels(
+ ShipStrategyType shipStrategyType,
+ int numRecords,
+ int numChannels,
+ Either<Integer, String> recordType) {
+ final TypeComparator<Record> comparator = new
RecordComparatorFactory(
+ new int[] {0}, new Class[] {recordType.isLeft() ?
IntValue.class : StringValue.class}).createComparator();
+ final ChannelSelector<SerializationDelegate<Record>> selector =
new OutputEmitter<>(shipStrategyType, comparator);
+ final SerializationDelegate<Record> delegate = new
SerializationDelegate<>(new RecordSerializerFactory().getSerializer());
+ return getSelectedChannels(selector, delegate, recordType,
numRecords, numChannels);
+ }
+
+ private int[] getSelectedChannels(
+ ChannelSelector<SerializationDelegate<Record>> selector,
+ SerializationDelegate<Record> delegate,
+ Either<Integer, String> recordType,
+ int numRecords,
+ int numChannels) {
+ int[] hits = new int[numChannels];
+ Value value;
+ for (int i = 0; i < numRecords; i++) {
+ if (recordType.isLeft()) {
+ value = new IntValue(i);
+ } else {
+ value = new StringValue(i + "");
+ }
+ Record record = new Record(value);
+ delegate.setInstance(record);
+
+ int[] channels = selector.selectChannels(delegate,
hits.length);
+ for (int channel : channels) {
+ hits[channel]++;
+ }
+ }
+ return hits;
+ }
+
+ private void assertPartitionHashSelectedChannels(
+ ChannelSelector selector,
+ SerializationDelegate<Integer> serializationDelegate,
+ int record,
+ int numChannels) {
+ serializationDelegate.setInstance(record);
+ int[] selectedChannels =
selector.selectChannels(serializationDelegate, numChannels);
+
+ assertTrue(selectedChannels.length == 1);
+ assertTrue(selectedChannels[0] >= 0 && selectedChannels[0] <=
numChannels - 1);
+ }
+
+ private void verifyPartitionHashSelectedChannels(int numRecords, int
numChannels, Either<Integer, String> recordType) {
+ int[] hits =
getSelectedChannels(ShipStrategyType.PARTITION_HASH, numRecords, numChannels,
recordType);
+
+ int cnt = 0;
Review comment:
nit: rename to `totalHitCount`?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Deduplicate common codes in OutputEmitterTest
> ---------------------------------------------
>
> Key: FLINK-10942
> URL: https://issues.apache.org/jira/browse/FLINK-10942
> Project: Flink
> Issue Type: Sub-task
> Components: Network, Tests
> Reporter: zhijiang
> Assignee: zhijiang
> Priority: Minor
> Labels: pull-request-available
> Fix For: 1.7.1
>
>
> There are many duplicated codes in {{OutputEmitterTest}} to make it difficult
> to maintain. So it is necessary to abstract the common codes to make it
> simple which brings benefits for the following refactor work in
> {{ChannelSelector}} interface.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)