[
https://issues.apache.org/jira/browse/BEAM-3949?focusedWorklogId=113953&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113953
]
ASF GitHub Bot logged work on BEAM-3949:
----------------------------------------
Author: ASF GitHub Bot
Created on: 20/Jun/18 21:50
Start Date: 20/Jun/18 21:50
Worklog Time Spent: 10m
Work Description: chamikaramj closed pull request #5434: [BEAM-3949]
IOIT's setup() and teardown() db connection attempt sometimes fail resulting in
test flakiness
URL: https://github.com/apache/beam/pull/5434
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOITHelper.java
b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOITHelper.java
index 5d0971fab02..0efb43b51d0 100644
---
a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOITHelper.java
+++
b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOITHelper.java
@@ -21,33 +21,81 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.testing.TestPipeline;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-/**
- * Methods common to all types of IOITs.
- */
+/** Methods common to all types of IOITs. */
public class IOITHelper {
+ private static final Logger LOG = LoggerFactory.getLogger(IOITHelper.class);
+ private static final int maxAttempts = 3;
+ private static final long minDelay = 1_000;
- private IOITHelper() {
- }
+ private IOITHelper() {}
public static String getHashForRecordCount(int recordCount, Map<Integer,
String> hashes) {
String hash = hashes.get(recordCount);
if (hash == null) {
throw new UnsupportedOperationException(
- String.format("No hash for that record count: %s", recordCount)
- );
+ String.format("No hash for that record count: %s", recordCount));
}
return hash;
}
public static <T extends IOTestPipelineOptions> T readIOTestPipelineOptions(
- Class<T> optionsType) {
+ Class<T> optionsType) {
PipelineOptionsFactory.register(optionsType);
- IOTestPipelineOptions options = TestPipeline
- .testingPipelineOptions()
- .as(optionsType);
+ IOTestPipelineOptions options =
TestPipeline.testingPipelineOptions().as(optionsType);
return PipelineOptionsValidator.validate(optionsType, options);
}
+
+ /** Interface for passing any method to executeWithRetry function. */
+ @FunctionalInterface
+ public interface RetryFunction {
+ void run() throws Exception;
+ }
+
+ /**
+ * This function executes the method and retries it in case of failure.
+ *
+ * @param function The function to retry
+ * @throws Exception
+ */
+ public static void executeWithRetry(RetryFunction function) throws Exception
{
+ executeWithRetry(maxAttempts, minDelay, function);
+ }
+
+ /**
+ * This function executes the method and retries it in case of failure. The
method is retried when
+ * an exception is thrown and it does not depend on the error response. This
can be used for tests
+ * which await for infrastructure setup i.e. database connection.
+ *
+ * @param maxAttempts The number of retry attempts
+ * @param minDelay Minimal delay which will grow exponentially
+ * @param function The function to retry
+ * @throws Exception
+ */
+ public static void executeWithRetry(int maxAttempts, long minDelay,
RetryFunction function)
+ throws Exception {
+ int attempts = 1;
+ long delay = minDelay;
+
+ while (attempts <= maxAttempts) {
+ try {
+ function.run();
+ return;
+ } catch (Exception e) {
+ LOG.warn("Attempt #{} of {} failed: {}.", attempts, maxAttempts,
e.getMessage());
+ if (attempts == maxAttempts) {
+ throw e;
+ } else {
+ long nextDelay = (long) Math.pow(2, attempts) * delay;
+ LOG.warn("Retrying in {} ms.", nextDelay);
+ Thread.sleep(nextDelay);
+ }
+ attempts++;
+ }
+ }
+ }
}
diff --git
a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOITHelperTest.java
b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOITHelperTest.java
new file mode 100644
index 00000000000..7901209ac72
--- /dev/null
+++
b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOITHelperTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.common;
+
+import static org.apache.beam.sdk.io.common.IOITHelper.executeWithRetry;
+import static org.junit.Assert.assertEquals;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for functions in {@link IOITHelper}. */
+@RunWith(JUnit4.class)
+public class IOITHelperTest {
+ private static long startTimeMeasure;
+ private static String message = "";
+ private static ArrayList<Exception> listOfExceptionsThrown;
+
+ @Rule public ExpectedException exceptionRule = ExpectedException.none();
+
+ @Before
+ public void setup() {
+ listOfExceptionsThrown = new ArrayList<>();
+ }
+
+ @Test
+ public void retryHealthyFunction() throws Exception {
+ executeWithRetry(IOITHelperTest::validFunction);
+ assertEquals("The healthy function.", message);
+ }
+
+ @Test
+ public void retryFunctionThatWillFail() throws Exception {
+ exceptionRule.expect(SQLException.class);
+ exceptionRule.expectMessage("Problem with connection");
+ executeWithRetry(IOITHelperTest::failingFunction);
+ assertEquals(3, listOfExceptionsThrown.size());
+ }
+
+ @Test
+ public void retryFunctionThatFailsWithMoreAttempts() throws Exception {
+ exceptionRule.expect(SQLException.class);
+ exceptionRule.expectMessage("Problem with connection");
+ executeWithRetry(4, 1_000, IOITHelperTest::failingFunction);
+ assertEquals(4, listOfExceptionsThrown.size());
+ }
+
+ @Test
+ public void retryFunctionThatRecovers() throws Exception {
+ startTimeMeasure = System.currentTimeMillis();
+ executeWithRetry(IOITHelperTest::recoveringFunction);
+ assertEquals(1, listOfExceptionsThrown.size());
+ }
+
+ @Test
+ public void retryFunctionThatRecoversAfterBiggerDelay() throws Exception {
+ startTimeMeasure = System.currentTimeMillis();
+ executeWithRetry(3, 2_000,
IOITHelperTest::recoveringFunctionWithBiggerDelay);
+ assertEquals(1, listOfExceptionsThrown.size());
+ }
+
+ private static void failingFunction() throws SQLException {
+ SQLException e = new SQLException("Problem with connection");
+ listOfExceptionsThrown.add(e);
+ throw e;
+ }
+
+ private static void recoveringFunction() throws SQLException {
+ if (System.currentTimeMillis() - startTimeMeasure < 1_001) {
+ SQLException e = new SQLException("Problem with connection");
+ listOfExceptionsThrown.add(e);
+ throw e;
+ }
+ }
+
+ private static void recoveringFunctionWithBiggerDelay() throws SQLException {
+ if (System.currentTimeMillis() - startTimeMeasure < 2_001) {
+ SQLException e = new SQLException("Problem with connection");
+ listOfExceptionsThrown.add(e);
+ throw e;
+ }
+ }
+
+ private static void validFunction() throws SQLException {
+ message = "The healthy function.";
+ }
+}
diff --git
a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOIT.java
b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOIT.java
index b1ffec4ddad..27d5f3e322b 100644
---
a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOIT.java
+++
b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOIT.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.io.hadoop.inputformat;
+import static org.apache.beam.sdk.io.common.IOITHelper.executeWithRetry;
import static
org.apache.beam.sdk.io.common.IOITHelper.readIOTestPipelineOptions;
import static
org.apache.beam.sdk.io.common.TestRow.DeterministicallyConstructTestRowFn;
import static org.apache.beam.sdk.io.common.TestRow.SelectNameFn;
@@ -86,7 +87,7 @@
public TestPipeline readPipeline = TestPipeline.create();
@BeforeClass
- public static void setUp() throws SQLException {
+ public static void setUp() throws Exception {
PostgresIOTestPipelineOptions options = readIOTestPipelineOptions(
PostgresIOTestPipelineOptions.class);
@@ -94,10 +95,14 @@ public static void setUp() throws SQLException {
numberOfRows = options.getNumberOfRecords();
tableName = DatabaseTestHelper.getTestTableName("HadoopInputFormatIOIT");
- DatabaseTestHelper.createTable(dataSource, tableName);
+ executeWithRetry(HadoopInputFormatIOIT::createTable);
setupHadoopConfiguration(options);
}
+ private static void createTable() throws SQLException {
+ DatabaseTestHelper.createTable(dataSource, tableName);
+ }
+
private static void setupHadoopConfiguration(PostgresIOTestPipelineOptions
options) {
Configuration conf = new Configuration();
DBConfiguration.configureDB(
@@ -120,7 +125,11 @@ private static void
setupHadoopConfiguration(PostgresIOTestPipelineOptions optio
}
@AfterClass
- public static void tearDown() throws SQLException {
+ public static void tearDown() throws Exception {
+ executeWithRetry(HadoopInputFormatIOIT::deleteTable);
+ }
+
+ private static void deleteTable() throws SQLException {
DatabaseTestHelper.deleteTable(dataSource, tableName);
}
diff --git
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
index 7123ed4ebab..30f9d849473 100644
--- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
+++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.io.jdbc;
+import static org.apache.beam.sdk.io.common.IOITHelper.executeWithRetry;
import static
org.apache.beam.sdk.io.common.IOITHelper.readIOTestPipelineOptions;
import java.sql.SQLException;
@@ -68,25 +69,32 @@
private static int numberOfRows;
private static PGSimpleDataSource dataSource;
private static String tableName;
-
@Rule
public TestPipeline pipelineWrite = TestPipeline.create();
@Rule
public TestPipeline pipelineRead = TestPipeline.create();
@BeforeClass
- public static void setup() throws SQLException {
+ public static void setup() throws Exception {
PostgresIOTestPipelineOptions options = readIOTestPipelineOptions(
PostgresIOTestPipelineOptions.class);
numberOfRows = options.getNumberOfRecords();
dataSource = DatabaseTestHelper.getPostgresDataSource(options);
tableName = DatabaseTestHelper.getTestTableName("IT");
+ executeWithRetry(JdbcIOIT::createTable);
+ }
+
+ private static void createTable() throws SQLException {
DatabaseTestHelper.createTable(dataSource, tableName);
}
@AfterClass
- public static void tearDown() throws SQLException {
+ public static void tearDown() throws Exception {
+ executeWithRetry(JdbcIOIT::deleteTable);
+ }
+
+ private static void deleteTable() throws SQLException {
DatabaseTestHelper.deleteTable(dataSource, tableName);
}
diff --git
a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBIOIT.java
b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBIOIT.java
index 1753ad5489c..22af72a4ebc 100644
---
a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBIOIT.java
+++
b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBIOIT.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.io.mongodb;
+import static org.apache.beam.sdk.io.common.IOITHelper.executeWithRetry;
import static org.apache.beam.sdk.io.common.IOITHelper.getHashForRecordCount;
import com.google.common.collect.ImmutableMap;
@@ -106,7 +107,11 @@ public static void setUp() {
}
@AfterClass
- public static void tearDown() {
+ public static void tearDown() throws Exception {
+ executeWithRetry(MongoDBIOIT::dropDatabase);
+ }
+
+ public static void dropDatabase() throws Exception {
new MongoClient(options.getMongoDBHostName())
.getDatabase(options.getMongoDBDatabaseName())
.drop();
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 113953)
Time Spent: 10.5h (was: 10h 20m)
> IOIT's setup() and teardown() db connection attempt sometimes fail resulting
> in test flakiness
> ----------------------------------------------------------------------------------------------
>
> Key: BEAM-3949
> URL: https://issues.apache.org/jira/browse/BEAM-3949
> Project: Beam
> Issue Type: Sub-task
> Components: testing
> Reporter: Łukasz Gajowy
> Assignee: Kasia Kucharczyk
> Priority: Major
> Time Spent: 10.5h
> Remaining Estimate: 0h
>
> setup() and teardown() methods sometimes have trouble connecting database in
> Performance tests. It results in test flakiness.
> Example logs:
> [https://builds.apache.org/view/A-D/view/Beam/job/beam_PerformanceTests_HadoopInputFormat/65/console]
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)