This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new f72523c  [GOBBLIN-1432] add run with timeout for kafka8 flush when 
close
f72523c is described below

commit f72523c3ed8b2b05a77accb96d6a845324d90e69
Author: hanghangliu <[email protected]>
AuthorDate: Tue Apr 27 13:08:58 2021 -0700

    [GOBBLIN-1432] add run with timeout for kafka8 flush when close
    
    address comments
    
    Closes #3270 from hanghangliu/GOBBLIN-1432-Fix-
    JVM-hangs-on-flushing-events-after-OOM
---
 .../gobblin/metrics/kafka/KafkaProducerPusher.java |  16 ++-
 .../metrics/reporter/KafkaProducerPusherTest.java  | 125 +++++++++++++++++++++
 .../java/org/apache/gobblin/KafkaCommonUtil.java   |  62 ++++++++++
 3 files changed, 202 insertions(+), 1 deletion(-)

diff --git 
a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java
 
b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java
index 2772e4a..04a7468 100644
--- 
a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java
+++ 
b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java
@@ -24,6 +24,9 @@ import java.util.Queue;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingDeque;
 
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.gobblin.KafkaCommonUtil;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -134,7 +137,18 @@ public class KafkaProducerPusher implements Pusher<byte[]> 
{
   public void close()
       throws IOException {
     log.info("Flushing records before close");
-    flush(Long.MAX_VALUE);
+    //Call flush() before invoking close() to ensure any buffered messages are 
immediately sent. This is required
+    //since close() only guarantees delivery of in-flight messages. Set a 
timeout to prevent GOBBLIN-1432 issue.
+    //This issue shouldn't exits in later version, as native flush function 
has a timeout setting offset.flush.timeout.ms
+    try {
+      KafkaCommonUtil.runWithTimeout(
+          () -> flush(Long.MAX_VALUE), 
KafkaCommonUtil.KAFKA_FLUSH_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+    } catch (TimeoutException e) {
+      log.warn("Flush records before close was interrupted! Reached {} seconds 
timeout!",
+          KafkaCommonUtil.KAFKA_FLUSH_TIMEOUT_SECONDS);
+    } catch (Exception e) {
+      log.error("Exception encountered when flushing record before close", e);
+    }
     this.closer.close();
   }
 
diff --git 
a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaProducerPusherTest.java
 
b/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaProducerPusherTest.java
new file mode 100644
index 0000000..c00e483
--- /dev/null
+++ 
b/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaProducerPusherTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.reporter;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.typesafe.config.ConfigFactory;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import kafka.consumer.ConsumerIterator;
+import org.apache.gobblin.kafka.KafkaTestBase;
+import org.apache.gobblin.metrics.kafka.KafkaProducerPusher;
+import org.apache.gobblin.metrics.kafka.Pusher;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.apache.gobblin.KafkaCommonUtil.*;
+
+
+/**
+ * Test {@link org.apache.gobblin.metrics.kafka.KafkaProducerPusher}.
+ */
+public class KafkaProducerPusherTest {
+  public static final String TOPIC = 
KafkaProducerPusherTest.class.getSimpleName();
+
+  private org.apache.gobblin.kafka.KafkaTestBase kafkaTestHelper;
+
+  private final long flushTimeoutMilli = KAFKA_FLUSH_TIMEOUT_SECONDS * 1000;
+
+  @BeforeClass
+  public void setup() throws Exception {
+    kafkaTestHelper = new KafkaTestBase();
+    kafkaTestHelper.startServers();
+
+    kafkaTestHelper.provisionTopic(TOPIC);
+  }
+
+  @Test(priority = 0)
+  public void testPushMessages() throws IOException {
+    // Test that the scoped config overrides the generic config
+    Pusher pusher = new KafkaProducerPusher("127.0.0.1:dummy", TOPIC, 
Optional.of(ConfigFactory.parseMap(ImmutableMap.of(
+        ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:" + 
this.kafkaTestHelper.getKafkaServerPort()))));
+
+    String msg1 = "msg1";
+    String msg2 = "msg2";
+
+    pusher.pushMessages(Lists.newArrayList(msg1.getBytes(), msg2.getBytes()));
+
+    try {
+      Thread.sleep(1000);
+    } catch(InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+
+    ConsumerIterator<byte[], byte[]> iterator = 
this.kafkaTestHelper.getIteratorForTopic(TOPIC);
+
+    assert(iterator.hasNext());
+    Assert.assertEquals(new String(iterator.next().message()), msg1);
+    assert(iterator.hasNext());
+    Assert.assertEquals(new String(iterator.next().message()), msg2);
+
+    pusher.close();
+  }
+
+  @Test(priority = 1)
+  public void testCloseTimeOut() throws IOException {
+    // Test that the scoped config overrides the generic config
+    Pusher pusher = new KafkaProducerPusher("127.0.0.1:dummy", TOPIC, 
Optional.of(ConfigFactory.parseMap(ImmutableMap.of(
+        ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:" + 
this.kafkaTestHelper.getKafkaServerPort()))));
+
+    Runnable stuffToDo = new Thread() {
+      @Override
+      public void run() {
+        final long startRunTime = System.currentTimeMillis();
+        String msg = "msg";
+        ArrayList al = Lists.newArrayList(msg.getBytes());
+        // Keep push messages that last 2 times longer than close timeout
+        while ( System.currentTimeMillis() - startRunTime < flushTimeoutMilli 
* 2) {
+          pusher.pushMessages(al);
+        }
+      }
+    };
+    ExecutorService executor = Executors.newSingleThreadExecutor();
+    executor.submit(stuffToDo);
+    try {
+      Thread.sleep(1000);
+    } catch(InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+    long startCloseTime = System.currentTimeMillis();
+    pusher.close();
+    // Assert that the close should be performed around the timeout, even more 
messages being pushed
+    Assert.assertTrue(System.currentTimeMillis() - startCloseTime < 
flushTimeoutMilli + 3000);
+  }
+
+  @AfterClass
+  public void after() {
+    try {
+      this.kafkaTestHelper.close();
+    } catch(Exception e) {
+      System.err.println("Failed to close Kafka server.");
+    }
+  }
+}
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/KafkaCommonUtil.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/KafkaCommonUtil.java
new file mode 100644
index 0000000..2ae059d
--- /dev/null
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/KafkaCommonUtil.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+
+public class KafkaCommonUtil {
+  public static final long KAFKA_FLUSH_TIMEOUT_SECONDS = 15L;
+
+  public static void runWithTimeout(final Runnable runnable, long timeout, 
TimeUnit timeUnit) throws Exception {
+    runWithTimeout(() -> {
+      runnable.run();
+      return null;
+    }, timeout, timeUnit);
+  }
+
+  public static <T> T runWithTimeout(Callable<T> callable, long timeout, 
TimeUnit timeUnit) throws Exception {
+    final ExecutorService executor = Executors.newSingleThreadExecutor();
+    final Future<T> future = executor.submit(callable);
+    // This does not cancel the already-scheduled task.
+    executor.shutdown();
+    try {
+      return future.get(timeout, timeUnit);
+    } catch (TimeoutException e) {
+      // stop the running thread
+      future.cancel(true);
+      throw e;
+    } catch (ExecutionException e) {
+      // unwrap the root cause
+      Throwable t = e.getCause();
+      if (t instanceof Error) {
+        throw (Error) t;
+      } else if (t instanceof Exception) {
+        throw (Exception) t;
+      } else {
+        throw new IllegalStateException(t);
+      }
+    }
+  }
+}

Reply via email to