This is an automated email from the ASF dual-hosted git repository.
shanthoosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new f3d0dc7 SAMZA-2118: Improve the shutdown sequence of AsyncRunLoop.
(#935)
f3d0dc7 is described below
commit f3d0dc7a263ed48f6f3ff6fad8bb9b5ea0b9adff
Author: shanthoosh <[email protected]>
AuthorDate: Tue Mar 5 12:12:14 2019 -0800
SAMZA-2118: Improve the shutdown sequence of AsyncRunLoop. (#935)
* Wake up AsyncRunLoop on shutdown if its waiting for dispatched messages
to come-back.
* Code clean up.
* Address review comments.
---
.../java/org/apache/samza/task/AsyncRunLoop.java | 22 +++---
.../samza/test/processor/TestTaskApplication.java | 84 ++++++++++++++++++++++
.../processor/TestZkLocalApplicationRunner.java | 32 +++++++++
3 files changed, 130 insertions(+), 8 deletions(-)
diff --git a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
index 6c255f1..d209d4c 100644
--- a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
+++ b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
@@ -193,6 +193,7 @@ public class AsyncRunLoop implements Runnable, Throttleable
{
public void shutdown() {
shutdownNow = true;
+ resume();
}
/**
@@ -220,15 +221,17 @@ public class AsyncRunLoop implements Runnable,
Throttleable {
* Insert the envelope into the task pending queues and run all the tasks
*/
private void runTasks(IncomingMessageEnvelope envelope) {
- if (envelope != null) {
- PendingEnvelope pendingEnvelope = new PendingEnvelope(envelope);
- for (AsyncTaskWorker worker :
sspToTaskWorkerMapping.get(envelope.getSystemStreamPartition())) {
- worker.state.insertEnvelope(pendingEnvelope);
+ if (!shutdownNow) {
+ if (envelope != null) {
+ PendingEnvelope pendingEnvelope = new PendingEnvelope(envelope);
+ for (AsyncTaskWorker worker :
sspToTaskWorkerMapping.get(envelope.getSystemStreamPartition())) {
+ worker.state.insertEnvelope(pendingEnvelope);
+ }
}
- }
- for (AsyncTaskWorker worker: taskWorkers) {
- worker.run();
+ for (AsyncTaskWorker worker: taskWorkers) {
+ worker.run();
+ }
}
}
@@ -280,7 +283,10 @@ public class AsyncRunLoop implements Runnable,
Throttleable {
}
/**
- * Resume the runloop thread. It is triggered once a task becomes ready
again or has failure.
+ * Resume the runloop thread. This API is triggered in the following
scenarios:
+ * A. A task becomes ready to process a message.
+ * B. A task has failed when processing a message.
+ * C. User thread shuts down the run loop.
*/
private void resume() {
log.trace("Resume loop thread");
diff --git
a/samza-test/src/test/java/org/apache/samza/test/processor/TestTaskApplication.java
b/samza-test/src/test/java/org/apache/samza/test/processor/TestTaskApplication.java
new file mode 100644
index 0000000..57a061f
--- /dev/null
+++
b/samza-test/src/test/java/org/apache/samza/test/processor/TestTaskApplication.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.processor;
+
+import java.util.concurrent.CountDownLatch;
+import org.apache.samza.application.TaskApplication;
+import org.apache.samza.application.descriptors.TaskApplicationDescriptor;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
+import org.apache.samza.task.AsyncStreamTask;
+import org.apache.samza.task.AsyncStreamTaskFactory;
+import org.apache.samza.task.ClosableTask;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCallback;
+import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.test.table.TestTableData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestTaskApplication implements TaskApplication {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(TestTaskApplication.class);
+
+ private final String systemName;
+ private final String inputTopic;
+ private final String outputTopic;
+ private final CountDownLatch shutdownLatch;
+ private final CountDownLatch processedMessageLatch;
+
+ public TestTaskApplication(String systemName, String inputTopic, String
outputTopic,
+ CountDownLatch processedMessageLatch, CountDownLatch shutdownLatch) {
+ this.systemName = systemName;
+ this.inputTopic = inputTopic;
+ this.outputTopic = outputTopic;
+ this.processedMessageLatch = processedMessageLatch;
+ this.shutdownLatch = shutdownLatch;
+ }
+
+ private class TestTaskImpl implements AsyncStreamTask, ClosableTask {
+
+ @Override
+ public void processAsync(IncomingMessageEnvelope envelope,
MessageCollector collector, TaskCoordinator coordinator, TaskCallback callback)
{
+ processedMessageLatch.countDown();
+ // Implementation does not invokes callback.complete to block the
AsyncRunLoop.process() after it exhausts the
+ // `task.max.concurrency` defined per task.
+ }
+
+ @Override
+ public void close() {
+ LOG.info("Task instance is shutting down.");
+ shutdownLatch.countDown();
+ }
+ }
+
+ @Override
+ public void describe(TaskApplicationDescriptor appDescriptor) {
+ KafkaSystemDescriptor ksd = new KafkaSystemDescriptor(systemName);
+ KafkaInputDescriptor<TestTableData.Profile> inputDescriptor =
ksd.getInputDescriptor(inputTopic, new NoOpSerde<>());
+ KafkaOutputDescriptor<TestTableData.EnrichedPageView> outputDescriptor =
ksd.getOutputDescriptor(outputTopic, new NoOpSerde<>());
+ appDescriptor.withInputStream(inputDescriptor)
+ .withOutputStream(outputDescriptor)
+ .withTaskFactory((AsyncStreamTaskFactory) () -> new
TestTaskImpl());
+ }
+}
diff --git
a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
index 8249bcc..44c7722 100644
---
a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
+++
b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
@@ -45,6 +45,7 @@ import org.I0Itec.zkclient.ZkClient;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.samza.Partition;
+import org.apache.samza.application.TaskApplication;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.ClusterManagerConfig;
@@ -940,6 +941,37 @@ public class TestZkLocalApplicationRunner extends
StandaloneIntegrationTestHarne
Assert.assertEquals(32, jobModel.maxChangeLogStreamPartitions);
}
+ @Test
+ public void
testApplicationShutdownShouldBeIndependentOfPerMessageProcessingTime() throws
Exception {
+ publishKafkaEvents(inputKafkaTopic, 0, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);
+
+ // Create a TaskApplication with only one task per container.
+ // The task does not invokes taskCallback.complete for any of the
dispatched message.
+ CountDownLatch shutdownLatch = new CountDownLatch(1);
+ CountDownLatch processedMessagesLatch1 = new CountDownLatch(1);
+
+ TaskApplication taskApplication = new TestTaskApplication(TEST_SYSTEM,
inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, shutdownLatch);
+ MapConfig taskApplicationConfig = new
MapConfig(ImmutableList.of(applicationConfig1,
+ ImmutableMap.of(TaskConfig.MAX_CONCURRENCY(), "1",
JobConfig.SSP_GROUPER_FACTORY(),
"org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory")));
+ ApplicationRunner appRunner =
ApplicationRunners.getApplicationRunner(taskApplication, taskApplicationConfig);
+
+ // Run the application.
+ executeRun(appRunner, applicationConfig1);
+
+ // Wait for the task to receive at least one dispatched message.
+ processedMessagesLatch1.await();
+
+ // Kill the application when none of the dispatched messages is
acknowledged as completed by the task.
+ appRunner.kill();
+ appRunner.waitForFinish();
+
+ // Expect the shutdown latch to be triggered.
+ shutdownLatch.await();
+
+ // Assert that the shutdown was successful.
+ Assert.assertEquals(ApplicationStatus.SuccessfulFinish,
appRunner.status());
+ }
+
/**
* Computes the task to partition assignment of the {@param JobModel}.
* @param jobModel the jobModel to compute task to partition assignment for.