This is an automated email from the ASF dual-hosted git repository.
rgoers pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/flume.git
The following commit(s) were added to refs/heads/trunk by this push:
new d504510 Fix issues in Linux VM. Provide more info to debug Travis
d504510 is described below
commit d5045106272103d06b3a4bd15eed92123b31a38a
Author: Ralph Goers <[email protected]>
AuthorDate: Tue Jan 25 10:57:55 2022 -0800
Fix issues in Linux VM. Provide more info to debug Travis
---
flume-ng-channels/flume-file-channel/pom.xml | 2 +-
.../flume/channel/file/TestFileChannelRestart.java | 5 +--
.../org/apache/flume/channel/file/TestUtils.java | 36 +++++++++++++++++++---
.../org/apache/flume/source/TestExecSource.java | 2 +-
.../apache/flume/source/TestSyslogTcpSource.java | 2 +-
.../shared/kafka/test/KafkaPartitionTestUtil.java | 14 ++++++---
6 files changed, 47 insertions(+), 14 deletions(-)
diff --git a/flume-ng-channels/flume-file-channel/pom.xml
b/flume-ng-channels/flume-file-channel/pom.xml
index dc698d8..b18a818 100644
--- a/flume-ng-channels/flume-file-channel/pom.xml
+++ b/flume-ng-channels/flume-file-channel/pom.xml
@@ -34,7 +34,7 @@
<properties>
<!-- TODO fix spotbugs violations -->
<spotbugs.maxAllowedViolations>86</spotbugs.maxAllowedViolations>
- <pmd.maxAllowedViolations>486</pmd.maxAllowedViolations>
+ <pmd.maxAllowedViolations>544</pmd.maxAllowedViolations>
</properties>
<dependencies>
diff --git
a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
index 80c3013..210ae46 100644
---
a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
+++
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
@@ -48,6 +48,7 @@ import static
org.apache.flume.channel.file.TestUtils.compareInputAndOut;
import static org.apache.flume.channel.file.TestUtils.consumeChannel;
import static org.apache.flume.channel.file.TestUtils.fillChannel;
import static org.apache.flume.channel.file.TestUtils.forceCheckpoint;
+import static org.apache.flume.channel.file.TestUtils.doForcedCheckpoint;
import static org.apache.flume.channel.file.TestUtils.putEvents;
import static org.apache.flume.channel.file.TestUtils.putWithoutCommit;
import static org.apache.flume.channel.file.TestUtils.takeEvents;
@@ -823,12 +824,12 @@ public class TestFileChannelRestart extends
TestFileChannelBase {
Set<String> in = putEvents(channel, "restart", 10, 100);
Assert.assertEquals(100, in.size());
slowdownBackup(channel);
- forceCheckpoint(channel);
+ doForcedCheckpoint(channel);
in = putEvents(channel, "restart", 10, 100);
takeEvents(channel, 10, 100);
Assert.assertEquals(100, in.size());
try {
- forceCheckpoint(channel);
+ doForcedCheckpoint(channel);
} catch (ReflectionError ex) {
throw ex.getCause();
} finally {
diff --git
a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
index 0ec1831..057518d 100644
---
a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
+++
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
@@ -126,6 +126,33 @@ public class TestUtils {
}
public static void forceCheckpoint(FileChannel channel) {
+ // The compiler doesn't know that an IOException can be thrown here so
won't let us catch it or
+ // even check for it normally.
+ // If the checkpoint backup is in progress, then retry.
+ String ioeClass = IOException.class.getName();
+ for (int i = 0; i < 10; ++i) {
+ try {
+ doForcedCheckpoint(channel);
+ break;
+ } catch (Throwable ioe) {
+ Throwable cause = ioe.getCause();
+ if (cause != null && cause.getClass().getName().equals(ioeClass)) {
+ String message = cause.getMessage();
+ if (message != null && message.startsWith("Previous backup")) {
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException ex) {
+ // Ignore it.
+ }
+ continue;
+ }
+ }
+ throw ioe;
+ }
+ }
+ }
+
+ public static void doForcedCheckpoint(FileChannel channel) {
Log log = field("log")
.ofType(Log.class)
.in(channel)
@@ -133,12 +160,13 @@ public class TestUtils {
Assert.assertTrue("writeCheckpoint returned false",
method("writeCheckpoint")
- .withReturnType(Boolean.class)
- .withParameterTypes(Boolean.class)
- .in(log)
- .invoke(true));
+ .withReturnType(Boolean.class)
+ .withParameterTypes(Boolean.class)
+ .in(log)
+ .invoke(true));
}
+
public static Set<String> takeEvents(Channel channel, int batchSize) throws
Exception {
return takeEvents(channel, batchSize, false);
}
diff --git
a/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java
b/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java
index 36a2921..563a25e 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java
@@ -451,11 +451,11 @@ public class TestExecSource {
}
return result;
} finally {
+ int exit = process.waitFor();
process.destroy();
if (reader != null) {
reader.close();
}
- int exit = process.waitFor();
if (exit != 0) {
throw new IllegalStateException("Command [" + command + "] exited with
" + exit);
}
diff --git
a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java
b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java
index 6bbc91f..637fc80 100644
---
a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java
+++
b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java
@@ -316,7 +316,7 @@ public class TestSyslogTcpSource {
String headerValue = headers.get(headerName);
if (TEST_CLIENT_HOSTNAME_HEADER.equals(headerName)) {
if (!"localhost".equals(headerValue) &&
!"127.0.0.1".equals(headerValue)) {
- fail("Expected either 'localhost' or '127.0.0.1'");
+ fail("Expected either 'localhost' or '127.0.0.1', got: " +
headerValue);
}
} else {
assertEquals("Event header value does not match: " + headerName,
diff --git
a/flume-shared/flume-shared-kafka-test/src/main/java/org/apache/flume/shared/kafka/test/KafkaPartitionTestUtil.java
b/flume-shared/flume-shared-kafka-test/src/main/java/org/apache/flume/shared/kafka/test/KafkaPartitionTestUtil.java
index 66a55fe..946c0bb 100644
---
a/flume-shared/flume-shared-kafka-test/src/main/java/org/apache/flume/shared/kafka/test/KafkaPartitionTestUtil.java
+++
b/flume-shared/flume-shared-kafka-test/src/main/java/org/apache/flume/shared/kafka/test/KafkaPartitionTestUtil.java
@@ -69,7 +69,7 @@ public class KafkaPartitionTestUtil {
} else {
// Since Kafka 2.4 results with no partition are not distrubuted
evenly.
int sum = resultsMap.values().stream().mapToInt(List::size).sum();
- Assert.assertEquals("Incorrect number of messages", numMsgs, sum);
+ Assert.assertEquals("Scenario: " + scenario + " Incorrect number of
messages", numMsgs, sum);
return;
}
}
@@ -79,18 +79,22 @@ public class KafkaPartitionTestUtil {
if (scenario == PartitionTestScenario.PARTITION_ID_HEADER_ONLY ||
scenario == PartitionTestScenario.STATIC_HEADER_AND_PARTITION_ID) {
// In these two scenarios we're checking against partitionMap
- Assert.assertEquals(expectedResults.size(), actualResults.size());
+ Assert.assertEquals("Scenario: " + scenario + " Partition " + ptn + "
incorrect",
+ expectedResults.size(), actualResults.size());
//Go and check the message payload is what we wanted it to be
for (int idx = 0; idx < expectedResults.size(); idx++) {
- Assert.assertArrayEquals(expectedResults.get(idx).getBody(),
actualResults.get(idx));
+ Assert.assertArrayEquals("Scenario: " + scenario + " Partition " +
ptn + " event " + idx
+ + " incorrect", expectedResults.get(idx).getBody(),
actualResults.get(idx));
}
} else if (scenario == PartitionTestScenario.STATIC_HEADER_ONLY) {
// Check that if we are looking in the statically assigned partition
// all messages are in it, else all other partitions are zero
if (ptn == staticPtn) {
- Assert.assertEquals(numMsgs, actualResults.size());
+ Assert.assertEquals("Scenario: " + scenario + " incorrect number of
messages in partition " +
+ ptn, numMsgs, actualResults.size());
} else {
- Assert.assertEquals(0, actualResults.size());
+ Assert.assertEquals("Scenario: " + scenario + " partition " + ptn +
"should have no messages",
+ 0, actualResults.size());
}
}
}