xinyuiscool commented on code in PR #1639:
URL: https://github.com/apache/samza/pull/1639#discussion_r1010901435
##########
samza-test/src/test/java/org/apache/samza/test/drain/DrainHighLevelApiIntegrationTest.java:
##########
@@ -74,20 +74,30 @@ public void describe(StreamApplicationDescriptor
appDescriptor) {
.map(KV::getValue)
.partitionBy(PageView::getMemberId, pv -> pv,
KVSerde.of(new IntegerSerde(), new
TestTableData.PageViewJsonSerde()), "p1")
+ .map(kv -> KV.of(kv.getKey() * 31, kv.getValue()))
+ .partitionBy(KV::getKey, KV::getValue, KVSerde.of(new
IntegerSerde(), new TestTableData.PageViewJsonSerde()), "p2")
.sink((m, collector, coordinator) -> {
RECEIVED.add(m.getValue());
});
}
}
- // The test can be occasionally flaky, so we set Ignore annotation
- // Remove ignore annotation and run the test as follows:
- // ./gradlew :samza-test:test --tests
org.apache.samza.test.drain.DrainHighLevelApiIntegrationTest -PscalaSuffix=2.12
+ /**
+ * This test will test drain and consumption of some messages from the
in-memory topic.
+ * In order to simulate the real-world behaviour of drain, the test adds
messages to the in-memory topic buffer periodically
+ * in a delayed fashion instead of all at once. The test then writes the
drain notification message to the in-memory
+ * metadata store to drain and stop the pipeline. This write is done shortly
after the pipeline starts and before all
+ * the messages are written to the topic's buffer. As a result, the total
count of the processed messages will be less
+ * than the expected count of messages.
+ * */
@Ignore
@Test
- public void testPipeline() {
+ public void testDrain() {
Review Comment:
Seems most of the integration tests are ignored due to relying on wall-clock
time. Is it possible to enable a couple and we don't need to do this timed
wait? Not sure this TestRunner support waitForFinish(). but samza runners have
this api so in theory we should be able to use it.
##########
samza-core/src/main/java/org/apache/samza/container/RunLoop.java:
##########
@@ -875,49 +876,40 @@ private boolean shouldDrain() {
return false;
}
- if (!pendingEnvelopeQueue.isEmpty()) {
- PendingEnvelope pendingEnvelope = pendingEnvelopeQueue.peek();
- IncomingMessageEnvelope envelope = pendingEnvelope.envelope;
+ if (pendingEnvelopeQueue.size() > 0) {
+ final PendingEnvelope pendingEnvelope = pendingEnvelopeQueue.peek();
+ final IncomingMessageEnvelope envelope = pendingEnvelope.envelope;
if (envelope.isDrain()) {
final DrainMessage message = (DrainMessage) envelope.getMessage();
if (!message.getRunId().equals(runId)) {
- // Removing the drain message from the pending queue as it doesn't
match with the current runId
- // Removing it will ensure that it is not picked up by process()
- pendingEnvelopeQueue.remove();
+ // Removing the drain message from the pending queue as it doesn't
match with the current deployment
+ final PendingEnvelope discardedDrainMessage =
pendingEnvelopeQueue.remove();
+
consumerMultiplexer.tryUpdate(discardedDrainMessage.envelope.getSystemStreamPartition());
} else {
+ // Found drain message matching the current deployment
+
// set the RunLoop to drain mode
if (!isDraining) {
drain();
}
- if (elasticityFactor <= 1) {
- SystemStreamPartition ssp = envelope.getSystemStreamPartition();
- processingSspSetToDrain.remove(ssp);
- } else {
- // SystemConsumers will write only one envelope (enclosing
DrainMessage) per SSP in its buffer.
- // This envelope doesn't have keybucket info it's SSP. With
elasticity, the same SSP can be processed by
- // multiple tasks. Therefore, if envelope contains drain
message, the ssp of envelope should be removed
- // from task's processing set irrespective of keyBucket.
- SystemStreamPartition sspOfEnvelope =
envelope.getSystemStreamPartition();
- Optional<SystemStreamPartition> ssp =
processingSspSetToDrain.stream()
- .filter(sspInSet ->
sspInSet.getSystemStream().equals(sspOfEnvelope.getSystemStream())
- &&
sspInSet.getPartition().equals(sspOfEnvelope.getPartition()))
- .findFirst();
- ssp.ifPresent(processingSspSetToDrain::remove);
- }
if (!hasIntermediateStreams) {
- // Don't remove from the pending queue as we want the DAG to
pick up Drain message and propagate it to
- // intermediate streams
+ // The flow below only applies to samza low-level API
+
+ // For high-level API, we do not remove the message from pending
queue.
+ // It will be picked by the process flow instead of drain flow,
as we want the drain control message
+ // to be processed by the High-level API Operator DAG.
+
+
processingSspSetToDrain.remove(envelope.getSystemStreamPartition());
pendingEnvelopeQueue.remove();
+ return processingSspSetToDrain.isEmpty();
Review Comment:
seems we will return this anyway in the end. Do we need to have another
return here?
##########
samza-core/src/main/java/org/apache/samza/container/RunLoop.java:
##########
@@ -875,49 +876,40 @@ private boolean shouldDrain() {
return false;
}
- if (!pendingEnvelopeQueue.isEmpty()) {
- PendingEnvelope pendingEnvelope = pendingEnvelopeQueue.peek();
- IncomingMessageEnvelope envelope = pendingEnvelope.envelope;
+ if (pendingEnvelopeQueue.size() > 0) {
+ final PendingEnvelope pendingEnvelope = pendingEnvelopeQueue.peek();
+ final IncomingMessageEnvelope envelope = pendingEnvelope.envelope;
if (envelope.isDrain()) {
final DrainMessage message = (DrainMessage) envelope.getMessage();
if (!message.getRunId().equals(runId)) {
- // Removing the drain message from the pending queue as it doesn't
match with the current runId
- // Removing it will ensure that it is not picked up by process()
- pendingEnvelopeQueue.remove();
+ // Removing the drain message from the pending queue as it doesn't
match with the current deployment
+ final PendingEnvelope discardedDrainMessage =
pendingEnvelopeQueue.remove();
+
consumerMultiplexer.tryUpdate(discardedDrainMessage.envelope.getSystemStreamPartition());
} else {
+ // Found drain message matching the current deployment
+
// set the RunLoop to drain mode
if (!isDraining) {
drain();
}
- if (elasticityFactor <= 1) {
- SystemStreamPartition ssp = envelope.getSystemStreamPartition();
- processingSspSetToDrain.remove(ssp);
- } else {
- // SystemConsumers will write only one envelope (enclosing
DrainMessage) per SSP in its buffer.
- // This envelope doesn't have keybucket info it's SSP. With
elasticity, the same SSP can be processed by
- // multiple tasks. Therefore, if envelope contains drain
message, the ssp of envelope should be removed
- // from task's processing set irrespective of keyBucket.
- SystemStreamPartition sspOfEnvelope =
envelope.getSystemStreamPartition();
- Optional<SystemStreamPartition> ssp =
processingSspSetToDrain.stream()
- .filter(sspInSet ->
sspInSet.getSystemStream().equals(sspOfEnvelope.getSystemStream())
- &&
sspInSet.getPartition().equals(sspOfEnvelope.getPartition()))
- .findFirst();
- ssp.ifPresent(processingSspSetToDrain::remove);
- }
if (!hasIntermediateStreams) {
Review Comment:
It's not very safe to use intermediatestreams to decide whether high-level
or low-level. It's possible that a high-level job has no intermediate streams
but has states that needs to be drained.
Do we even support drain on low-level api? I think we don't have such
support since watermark doesn't mean anything. If we only use drain on
high-level apis, I think we can safely delete this check.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]