This is an automated email from the ASF dual-hosted git repository.

shanthoosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new f3d0dc7  SAMZA-2118: Improve the shutdown sequence of AsyncRunLoop. 
(#935)
f3d0dc7 is described below

commit f3d0dc7a263ed48f6f3ff6fad8bb9b5ea0b9adff
Author: shanthoosh <[email protected]>
AuthorDate: Tue Mar 5 12:12:14 2019 -0800

    SAMZA-2118: Improve the shutdown sequence of AsyncRunLoop. (#935)
    
    * Wake up AsyncRunLoop on shutdown if its waiting for dispatched messages 
to come-back.
    
    * Code clean up.
    
    * Address review comments.
---
 .../java/org/apache/samza/task/AsyncRunLoop.java   | 22 +++---
 .../samza/test/processor/TestTaskApplication.java  | 84 ++++++++++++++++++++++
 .../processor/TestZkLocalApplicationRunner.java    | 32 +++++++++
 3 files changed, 130 insertions(+), 8 deletions(-)

diff --git a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java 
b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
index 6c255f1..d209d4c 100644
--- a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
+++ b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
@@ -193,6 +193,7 @@ public class AsyncRunLoop implements Runnable, Throttleable 
{
 
   public void shutdown() {
     shutdownNow = true;
+    resume();
   }
 
   /**
@@ -220,15 +221,17 @@ public class AsyncRunLoop implements Runnable, 
Throttleable {
    * Insert the envelope into the task pending queues and run all the tasks
    */
   private void runTasks(IncomingMessageEnvelope envelope) {
-    if (envelope != null) {
-      PendingEnvelope pendingEnvelope = new PendingEnvelope(envelope);
-      for (AsyncTaskWorker worker : 
sspToTaskWorkerMapping.get(envelope.getSystemStreamPartition())) {
-        worker.state.insertEnvelope(pendingEnvelope);
+    if (!shutdownNow) {
+      if (envelope != null) {
+        PendingEnvelope pendingEnvelope = new PendingEnvelope(envelope);
+        for (AsyncTaskWorker worker : 
sspToTaskWorkerMapping.get(envelope.getSystemStreamPartition())) {
+          worker.state.insertEnvelope(pendingEnvelope);
+        }
       }
-    }
 
-    for (AsyncTaskWorker worker: taskWorkers) {
-      worker.run();
+      for (AsyncTaskWorker worker: taskWorkers) {
+        worker.run();
+      }
     }
   }
 
@@ -280,7 +283,10 @@ public class AsyncRunLoop implements Runnable, 
Throttleable {
   }
 
   /**
-   * Resume the runloop thread. It is triggered once a task becomes ready 
again or has failure.
+   * Resume the runloop thread. This API is triggered in the following 
scenarios:
+   * A. A task becomes ready to process a message.
+   * B. A task has failed when processing a message.
+   * C. User thread shuts down the run loop.
    */
   private void resume() {
     log.trace("Resume loop thread");
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/processor/TestTaskApplication.java
 
b/samza-test/src/test/java/org/apache/samza/test/processor/TestTaskApplication.java
new file mode 100644
index 0000000..57a061f
--- /dev/null
+++ 
b/samza-test/src/test/java/org/apache/samza/test/processor/TestTaskApplication.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.processor;
+
+import java.util.concurrent.CountDownLatch;
+import org.apache.samza.application.TaskApplication;
+import org.apache.samza.application.descriptors.TaskApplicationDescriptor;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
+import org.apache.samza.task.AsyncStreamTask;
+import org.apache.samza.task.AsyncStreamTaskFactory;
+import org.apache.samza.task.ClosableTask;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCallback;
+import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.test.table.TestTableData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestTaskApplication implements TaskApplication {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestTaskApplication.class);
+
+  private final String systemName;
+  private final String inputTopic;
+  private final String outputTopic;
+  private final CountDownLatch shutdownLatch;
+  private final CountDownLatch processedMessageLatch;
+
+  public TestTaskApplication(String systemName, String inputTopic, String 
outputTopic,
+      CountDownLatch processedMessageLatch, CountDownLatch shutdownLatch) {
+    this.systemName = systemName;
+    this.inputTopic = inputTopic;
+    this.outputTopic = outputTopic;
+    this.processedMessageLatch = processedMessageLatch;
+    this.shutdownLatch = shutdownLatch;
+  }
+
+  private class TestTaskImpl implements AsyncStreamTask, ClosableTask {
+
+    @Override
+    public void processAsync(IncomingMessageEnvelope envelope, 
MessageCollector collector, TaskCoordinator coordinator, TaskCallback callback) 
{
+      processedMessageLatch.countDown();
+      // Implementation does not invokes callback.complete to block the 
AsyncRunLoop.process() after it exhausts the
+      // `task.max.concurrency` defined per task.
+    }
+
+    @Override
+    public void close() {
+      LOG.info("Task instance is shutting down.");
+      shutdownLatch.countDown();
+    }
+  }
+
+  @Override
+  public void describe(TaskApplicationDescriptor appDescriptor) {
+    KafkaSystemDescriptor ksd = new KafkaSystemDescriptor(systemName);
+    KafkaInputDescriptor<TestTableData.Profile> inputDescriptor = 
ksd.getInputDescriptor(inputTopic, new NoOpSerde<>());
+    KafkaOutputDescriptor<TestTableData.EnrichedPageView> outputDescriptor = 
ksd.getOutputDescriptor(outputTopic, new NoOpSerde<>());
+    appDescriptor.withInputStream(inputDescriptor)
+                 .withOutputStream(outputDescriptor)
+                 .withTaskFactory((AsyncStreamTaskFactory) () -> new 
TestTaskImpl());
+  }
+}
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
 
b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
index 8249bcc..44c7722 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
@@ -45,6 +45,7 @@ import org.I0Itec.zkclient.ZkClient;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.samza.Partition;
+import org.apache.samza.application.TaskApplication;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.ClusterManagerConfig;
@@ -940,6 +941,37 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
     Assert.assertEquals(32, jobModel.maxChangeLogStreamPartitions);
   }
 
+  @Test
+  public void 
testApplicationShutdownShouldBeIndependentOfPerMessageProcessingTime() throws 
Exception {
+    publishKafkaEvents(inputKafkaTopic, 0, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);
+
+    // Create a TaskApplication with only one task per container.
+    // The task does not invokes taskCallback.complete for any of the 
dispatched message.
+    CountDownLatch shutdownLatch = new CountDownLatch(1);
+    CountDownLatch processedMessagesLatch1 = new CountDownLatch(1);
+
+    TaskApplication taskApplication = new TestTaskApplication(TEST_SYSTEM, 
inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, shutdownLatch);
+    MapConfig taskApplicationConfig = new 
MapConfig(ImmutableList.of(applicationConfig1,
+        ImmutableMap.of(TaskConfig.MAX_CONCURRENCY(), "1", 
JobConfig.SSP_GROUPER_FACTORY(), 
"org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory")));
+    ApplicationRunner appRunner = 
ApplicationRunners.getApplicationRunner(taskApplication, taskApplicationConfig);
+
+    // Run the application.
+    executeRun(appRunner, applicationConfig1);
+
+    // Wait for the task to receive at least one dispatched message.
+    processedMessagesLatch1.await();
+
+    // Kill the application when none of the dispatched messages is 
acknowledged as completed by the task.
+    appRunner.kill();
+    appRunner.waitForFinish();
+
+    // Expect the shutdown latch to be triggered.
+    shutdownLatch.await();
+
+    // Assert that the shutdown was successful.
+    Assert.assertEquals(ApplicationStatus.SuccessfulFinish, 
appRunner.status());
+  }
+
   /**
    * Computes the task to partition assignment of the {@param JobModel}.
    * @param jobModel the jobModel to compute task to partition assignment for.

Reply via email to