This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new f72523c [GOBBLIN-1432] add run with timeout for kafka8 flush when
close
f72523c is described below
commit f72523c3ed8b2b05a77accb96d6a845324d90e69
Author: hanghangliu <[email protected]>
AuthorDate: Tue Apr 27 13:08:58 2021 -0700
[GOBBLIN-1432] add run with timeout for kafka8 flush when close
address comments
Closes #3270 from hanghangliu/GOBBLIN-1432-Fix-
JVM-hangs-on-flushing-events-after-OOM
---
.../gobblin/metrics/kafka/KafkaProducerPusher.java | 16 ++-
.../metrics/reporter/KafkaProducerPusherTest.java | 125 +++++++++++++++++++++
.../java/org/apache/gobblin/KafkaCommonUtil.java | 62 ++++++++++
3 files changed, 202 insertions(+), 1 deletion(-)
diff --git
a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java
b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java
index 2772e4a..04a7468 100644
---
a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java
+++
b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java
@@ -24,6 +24,9 @@ import java.util.Queue;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.gobblin.KafkaCommonUtil;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -134,7 +137,18 @@ public class KafkaProducerPusher implements Pusher<byte[]>
{
public void close()
throws IOException {
log.info("Flushing records before close");
- flush(Long.MAX_VALUE);
+ //Call flush() before invoking close() to ensure any buffered messages are
immediately sent. This is required
+ //since close() only guarantees delivery of in-flight messages. Set a
timeout to prevent GOBBLIN-1432 issue.
+ //This issue shouldn't exits in later version, as native flush function
has a timeout setting offset.flush.timeout.ms
+ try {
+ KafkaCommonUtil.runWithTimeout(
+ () -> flush(Long.MAX_VALUE),
KafkaCommonUtil.KAFKA_FLUSH_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ } catch (TimeoutException e) {
+ log.warn("Flush records before close was interrupted! Reached {} seconds
timeout!",
+ KafkaCommonUtil.KAFKA_FLUSH_TIMEOUT_SECONDS);
+ } catch (Exception e) {
+ log.error("Exception encountered when flushing record before close", e);
+ }
this.closer.close();
}
diff --git
a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaProducerPusherTest.java
b/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaProducerPusherTest.java
new file mode 100644
index 0000000..c00e483
--- /dev/null
+++
b/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/metrics/reporter/KafkaProducerPusherTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.reporter;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.typesafe.config.ConfigFactory;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import kafka.consumer.ConsumerIterator;
+import org.apache.gobblin.kafka.KafkaTestBase;
+import org.apache.gobblin.metrics.kafka.KafkaProducerPusher;
+import org.apache.gobblin.metrics.kafka.Pusher;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.apache.gobblin.KafkaCommonUtil.*;
+
+
+/**
+ * Test {@link org.apache.gobblin.metrics.kafka.KafkaProducerPusher}.
+ */
+public class KafkaProducerPusherTest {
+ public static final String TOPIC =
KafkaProducerPusherTest.class.getSimpleName();
+
+ private org.apache.gobblin.kafka.KafkaTestBase kafkaTestHelper;
+
+ private final long flushTimeoutMilli = KAFKA_FLUSH_TIMEOUT_SECONDS * 1000;
+
+ @BeforeClass
+ public void setup() throws Exception {
+ kafkaTestHelper = new KafkaTestBase();
+ kafkaTestHelper.startServers();
+
+ kafkaTestHelper.provisionTopic(TOPIC);
+ }
+
+ @Test(priority = 0)
+ public void testPushMessages() throws IOException {
+ // Test that the scoped config overrides the generic config
+ Pusher pusher = new KafkaProducerPusher("127.0.0.1:dummy", TOPIC,
Optional.of(ConfigFactory.parseMap(ImmutableMap.of(
+ ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:" +
this.kafkaTestHelper.getKafkaServerPort()))));
+
+ String msg1 = "msg1";
+ String msg2 = "msg2";
+
+ pusher.pushMessages(Lists.newArrayList(msg1.getBytes(), msg2.getBytes()));
+
+ try {
+ Thread.sleep(1000);
+ } catch(InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+
+ ConsumerIterator<byte[], byte[]> iterator =
this.kafkaTestHelper.getIteratorForTopic(TOPIC);
+
+ assert(iterator.hasNext());
+ Assert.assertEquals(new String(iterator.next().message()), msg1);
+ assert(iterator.hasNext());
+ Assert.assertEquals(new String(iterator.next().message()), msg2);
+
+ pusher.close();
+ }
+
+ @Test(priority = 1)
+ public void testCloseTimeOut() throws IOException {
+ // Test that the scoped config overrides the generic config
+ Pusher pusher = new KafkaProducerPusher("127.0.0.1:dummy", TOPIC,
Optional.of(ConfigFactory.parseMap(ImmutableMap.of(
+ ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:" +
this.kafkaTestHelper.getKafkaServerPort()))));
+
+ Runnable stuffToDo = new Thread() {
+ @Override
+ public void run() {
+ final long startRunTime = System.currentTimeMillis();
+ String msg = "msg";
+ ArrayList al = Lists.newArrayList(msg.getBytes());
+ // Keep push messages that last 2 times longer than close timeout
+ while ( System.currentTimeMillis() - startRunTime < flushTimeoutMilli
* 2) {
+ pusher.pushMessages(al);
+ }
+ }
+ };
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ executor.submit(stuffToDo);
+ try {
+ Thread.sleep(1000);
+ } catch(InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+ long startCloseTime = System.currentTimeMillis();
+ pusher.close();
+ // Assert that the close should be performed around the timeout, even more
messages being pushed
+ Assert.assertTrue(System.currentTimeMillis() - startCloseTime <
flushTimeoutMilli + 3000);
+ }
+
+ @AfterClass
+ public void after() {
+ try {
+ this.kafkaTestHelper.close();
+ } catch(Exception e) {
+ System.err.println("Failed to close Kafka server.");
+ }
+ }
+}
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/KafkaCommonUtil.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/KafkaCommonUtil.java
new file mode 100644
index 0000000..2ae059d
--- /dev/null
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/KafkaCommonUtil.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+
+public class KafkaCommonUtil {
+ public static final long KAFKA_FLUSH_TIMEOUT_SECONDS = 15L;
+
+ public static void runWithTimeout(final Runnable runnable, long timeout,
TimeUnit timeUnit) throws Exception {
+ runWithTimeout(() -> {
+ runnable.run();
+ return null;
+ }, timeout, timeUnit);
+ }
+
+ public static <T> T runWithTimeout(Callable<T> callable, long timeout,
TimeUnit timeUnit) throws Exception {
+ final ExecutorService executor = Executors.newSingleThreadExecutor();
+ final Future<T> future = executor.submit(callable);
+ // This does not cancel the already-scheduled task.
+ executor.shutdown();
+ try {
+ return future.get(timeout, timeUnit);
+ } catch (TimeoutException e) {
+ // stop the running thread
+ future.cancel(true);
+ throw e;
+ } catch (ExecutionException e) {
+ // unwrap the root cause
+ Throwable t = e.getCause();
+ if (t instanceof Error) {
+ throw (Error) t;
+ } else if (t instanceof Exception) {
+ throw (Exception) t;
+ } else {
+ throw new IllegalStateException(t);
+ }
+ }
+ }
+}