This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit a2f2975578b53c2ae91a3fd66ce3a2761447a7fe Author: Chesnay Schepler <[email protected]> AuthorDate: Mon Jun 15 13:05:03 2020 +0200 [FLINK-18301][e2e] Backup kafka logs on failure --- .../util/kafka/LocalStandaloneKafkaResource.java | 38 ++++++++++++++++++++-- .../kafka/LocalStandaloneKafkaResourceFactory.java | 18 +++++++++- 2 files changed, 52 insertions(+), 4 deletions(-) diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java index 7b0ecf0..e1a3ff4 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java @@ -21,6 +21,7 @@ package org.apache.flink.tests.util.kafka; import org.apache.flink.api.common.time.Deadline; import org.apache.flink.tests.util.AutoClosableProcess; import org.apache.flink.tests.util.CommandLineWrapper; +import org.apache.flink.tests.util.TestUtils; import org.apache.flink.tests.util.activation.OperatingSystemRestriction; import org.apache.flink.tests.util.cache.DownloadCache; import org.apache.flink.util.OperatingSystem; @@ -29,6 +30,8 @@ import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.io.IOException; import java.io.OutputStreamWriter; import java.io.PrintStream; @@ -45,6 +48,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -72,12 +76,15 @@ public class LocalStandaloneKafkaResource implements KafkaResource { private final DownloadCache downloadCache = DownloadCache.get(); private final String kafkaVersion; private Path kafkaDir; + @Nullable + private Path logBackupDirectory; - LocalStandaloneKafkaResource(final String kafkaVersion) { + LocalStandaloneKafkaResource(final String kafkaVersion, @Nullable Path logBackupDirectory) { OperatingSystemRestriction.forbid( String.format("The %s relies on UNIX utils and shell scripts.", getClass().getSimpleName()), OperatingSystem.WINDOWS); this.kafkaVersion = kafkaVersion; + this.logBackupDirectory = logBackupDirectory; } private static String getKafkaDownloadUrl(final String kafkaVersion) { @@ -162,6 +169,20 @@ public class LocalStandaloneKafkaResource implements KafkaResource { @Override public void afterTestSuccess() { + shutdownResource(); + downloadCache.afterTestSuccess(); + tmp.delete(); + } + + @Override + public void afterTestFailure() { + shutdownResource(); + backupLogs(); + downloadCache.afterTestFailure(); + tmp.delete(); + } + + private void shutdownResource() { try { AutoClosableProcess.runBlocking( kafkaDir.resolve(Paths.get("bin", "kafka-server-stop.sh")).toString() @@ -192,8 +213,19 @@ public class LocalStandaloneKafkaResource implements KafkaResource { } catch (IOException ioe) { LOG.warn("Error while shutting down zookeeper.", ioe); } - downloadCache.afterTestSuccess(); - tmp.delete(); + } + + private void backupLogs() { + if (logBackupDirectory != null) { + final Path targetDirectory = logBackupDirectory.resolve("kafka-" + UUID.randomUUID().toString()); + try { + Files.createDirectories(targetDirectory); + TestUtils.copyDirectory(kafkaDir.resolve("logs"), targetDirectory); + LOG.info("Backed up logs to {}.", targetDirectory); + } catch (IOException e) { + LOG.warn("An error has occurred while backing up logs to {}.", targetDirectory, e); + } + } } private static boolean isZookeeperRunning(final Path kafkaDir) { diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResourceFactory.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResourceFactory.java index 74ef5a5..073d26e 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResourceFactory.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResourceFactory.java @@ -18,13 +18,29 @@ package org.apache.flink.tests.util.kafka; +import org.apache.flink.tests.util.parameters.ParameterProperty; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Optional; + /** * A {@link KafkaResourceFactory} for the {@link LocalStandaloneKafkaResourceFactory}. */ public final class LocalStandaloneKafkaResourceFactory implements KafkaResourceFactory { + private static final Logger LOG = LoggerFactory.getLogger(LocalStandaloneKafkaResourceFactory.class); + + private static final ParameterProperty<Path> DISTRIBUTION_LOG_BACKUP_DIRECTORY = new ParameterProperty<>("logBackupDir", Paths::get); @Override public KafkaResource create(final String kafkaVersion) { - return new LocalStandaloneKafkaResource(kafkaVersion); + Optional<Path> logBackupDirectory = DISTRIBUTION_LOG_BACKUP_DIRECTORY.get(); + if (!logBackupDirectory.isPresent()) { + LOG.warn("Property {} not set, logs will not be backed up in case of test failures.", DISTRIBUTION_LOG_BACKUP_DIRECTORY.getPropertyName()); + } + return new LocalStandaloneKafkaResource(kafkaVersion, logBackupDirectory.orElse(null)); } }
