This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a2f2975578b53c2ae91a3fd66ce3a2761447a7fe
Author: Chesnay Schepler <[email protected]>
AuthorDate: Mon Jun 15 13:05:03 2020 +0200

    [FLINK-18301][e2e] Backup kafka logs on failure
---
 .../util/kafka/LocalStandaloneKafkaResource.java   | 38 ++++++++++++++++++++--
 .../kafka/LocalStandaloneKafkaResourceFactory.java | 18 +++++++++-
 2 files changed, 52 insertions(+), 4 deletions(-)

diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java
index 7b0ecf0..e1a3ff4 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java
@@ -21,6 +21,7 @@ package org.apache.flink.tests.util.kafka;
 import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.tests.util.AutoClosableProcess;
 import org.apache.flink.tests.util.CommandLineWrapper;
+import org.apache.flink.tests.util.TestUtils;
 import org.apache.flink.tests.util.activation.OperatingSystemRestriction;
 import org.apache.flink.tests.util.cache.DownloadCache;
 import org.apache.flink.util.OperatingSystem;
@@ -29,6 +30,8 @@ import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.io.OutputStreamWriter;
 import java.io.PrintStream;
@@ -45,6 +48,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
@@ -72,12 +76,15 @@ public class LocalStandaloneKafkaResource implements 
KafkaResource {
        private final DownloadCache downloadCache = DownloadCache.get();
        private final String kafkaVersion;
        private Path kafkaDir;
+       @Nullable
+       private Path logBackupDirectory;
 
-       LocalStandaloneKafkaResource(final String kafkaVersion) {
+       LocalStandaloneKafkaResource(final String kafkaVersion, @Nullable Path 
logBackupDirectory) {
                OperatingSystemRestriction.forbid(
                        String.format("The %s relies on UNIX utils and shell 
scripts.", getClass().getSimpleName()),
                        OperatingSystem.WINDOWS);
                this.kafkaVersion = kafkaVersion;
+               this.logBackupDirectory = logBackupDirectory;
        }
 
        private static String getKafkaDownloadUrl(final String kafkaVersion) {
@@ -162,6 +169,20 @@ public class LocalStandaloneKafkaResource implements 
KafkaResource {
 
        @Override
        public void afterTestSuccess() {
+               shutdownResource();
+               downloadCache.afterTestSuccess();
+               tmp.delete();
+       }
+
+       @Override
+       public void afterTestFailure() {
+               shutdownResource();
+               backupLogs();
+               downloadCache.afterTestFailure();
+               tmp.delete();
+       }
+
+       private void shutdownResource() {
                try {
                        AutoClosableProcess.runBlocking(
                                kafkaDir.resolve(Paths.get("bin", 
"kafka-server-stop.sh")).toString()
@@ -192,8 +213,19 @@ public class LocalStandaloneKafkaResource implements 
KafkaResource {
                } catch (IOException ioe) {
                        LOG.warn("Error while shutting down zookeeper.", ioe);
                }
-               downloadCache.afterTestSuccess();
-               tmp.delete();
+       }
+
+       private void backupLogs() {
+               if (logBackupDirectory != null) {
+                       final Path targetDirectory = 
logBackupDirectory.resolve("kafka-" + UUID.randomUUID().toString());
+                       try {
+                               Files.createDirectories(targetDirectory);
+                               
TestUtils.copyDirectory(kafkaDir.resolve("logs"), targetDirectory);
+                               LOG.info("Backed up logs to {}.", 
targetDirectory);
+                       } catch (IOException e) {
+                               LOG.warn("An error has occurred while backing 
up logs to {}.", targetDirectory, e);
+                       }
+               }
        }
 
        private static boolean isZookeeperRunning(final Path kafkaDir) {
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResourceFactory.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResourceFactory.java
index 74ef5a5..073d26e 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResourceFactory.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResourceFactory.java
@@ -18,13 +18,29 @@
 
 package org.apache.flink.tests.util.kafka;
 
+import org.apache.flink.tests.util.parameters.ParameterProperty;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Optional;
+
 /**
  * A {@link KafkaResourceFactory} for the {@link 
LocalStandaloneKafkaResourceFactory}.
  */
 public final class LocalStandaloneKafkaResourceFactory implements 
KafkaResourceFactory {
+       private static final Logger LOG = 
LoggerFactory.getLogger(LocalStandaloneKafkaResourceFactory.class);
+
+       private static final ParameterProperty<Path> 
DISTRIBUTION_LOG_BACKUP_DIRECTORY = new ParameterProperty<>("logBackupDir", 
Paths::get);
 
        @Override
        public KafkaResource create(final String kafkaVersion) {
-               return new LocalStandaloneKafkaResource(kafkaVersion);
+               Optional<Path> logBackupDirectory = 
DISTRIBUTION_LOG_BACKUP_DIRECTORY.get();
+               if (!logBackupDirectory.isPresent()) {
+                       LOG.warn("Property {} not set, logs will not be backed 
up in case of test failures.", 
DISTRIBUTION_LOG_BACKUP_DIRECTORY.getPropertyName());
+               }
+               return new LocalStandaloneKafkaResource(kafkaVersion, 
logBackupDirectory.orElse(null));
        }
 }

Reply via email to