ajothomas commented on code in PR #1639:
URL: https://github.com/apache/samza/pull/1639#discussion_r1013123821
##########
samza-test/src/test/java/org/apache/samza/test/drain/DrainHighLevelApiIntegrationTest.java:
##########
@@ -74,20 +74,30 @@ public void describe(StreamApplicationDescriptor
appDescriptor) {
.map(KV::getValue)
.partitionBy(PageView::getMemberId, pv -> pv,
KVSerde.of(new IntegerSerde(), new
TestTableData.PageViewJsonSerde()), "p1")
+ .map(kv -> KV.of(kv.getKey() * 31, kv.getValue()))
+ .partitionBy(KV::getKey, KV::getValue, KVSerde.of(new
IntegerSerde(), new TestTableData.PageViewJsonSerde()), "p2")
.sink((m, collector, coordinator) -> {
RECEIVED.add(m.getValue());
});
}
}
- // The test can be occasionally flaky, so we set Ignore annotation
- // Remove ignore annotation and run the test as follows:
- // ./gradlew :samza-test:test --tests
org.apache.samza.test.drain.DrainHighLevelApiIntegrationTest -PscalaSuffix=2.12
+ /**
+ * This test will test drain and consumption of some messages from the
in-memory topic.
+ * In order to simulate the real-world behaviour of drain, the test adds
messages to the in-memory topic buffer periodically
+ * in a delayed fashion instead of all at once. The test then writes the
drain notification message to the in-memory
+ * metadata store to drain and stop the pipeline. This write is done shortly
after the pipeline starts and before all
+ * the messages are written to the topic's buffer. As a result, the total
count of the processed messages will be less
+ * than the expected count of messages.
+ * */
@Ignore
@Test
- public void testPipeline() {
+ public void testDrain() {
Review Comment:
Tests have been turned on now after a few tweaks.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]