[
https://issues.apache.org/jira/browse/BEAM-5172?focusedWorklogId=139172&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-139172
]
ASF GitHub Bot logged work on BEAM-5172:
----------------------------------------
Author: ASF GitHub Bot
Created on: 29/Aug/18 08:39
Start Date: 29/Aug/18 08:39
Worklog Time Spent: 10m
Work Description: asfgit closed pull request #6279: [BEAM-5172] Fix
Elasticsearch UTests flakiness
URL: https://github.com/apache/beam/pull/6279
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index 862ba5b4292..9920dde53ba 100644
---
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -21,9 +21,9 @@
import static
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.ConnectionConfiguration;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.Read;
import static
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.ACCEPTABLE_EMPTY_SPLITS_PERCENTAGE;
-import static
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.ES_INDEX;
import static
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.ES_TYPE;
import static
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.NUM_DOCS_UTESTS;
+import static
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.getEsIndex;
import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource;
import static org.hamcrest.Matchers.lessThan;
import static org.junit.Assert.assertEquals;
@@ -61,6 +61,7 @@
private static final Logger LOG =
LoggerFactory.getLogger(ElasticsearchIOTest.class);
private static final String ES_IP = "127.0.0.1";
+ private static final int MAX_STARTUP_WAITING_TIME_MSEC = 5000;
private static Node node;
private static RestClient restClient;
@@ -97,10 +98,25 @@ public static void beforeClass() throws IOException {
node.start();
connectionConfiguration =
ConnectionConfiguration.create(
- new String[] {"http://" + ES_IP + ":" + esHttpPort}, ES_INDEX,
ES_TYPE);
+ new String[] {"http://" + ES_IP + ":" + esHttpPort}, getEsIndex(),
ES_TYPE);
restClient = connectionConfiguration.createClient();
elasticsearchIOTestCommon =
new ElasticsearchIOTestCommon(connectionConfiguration, restClient,
false);
+ int waitingTime = 0;
+ int healthCheckFrequency = 500;
+ while ((waitingTime < MAX_STARTUP_WAITING_TIME_MSEC)
+ && restClient.performRequest("HEAD",
"/").getStatusLine().getStatusCode() != 200) {
+ try {
+ Thread.sleep(healthCheckFrequency);
+ waitingTime += healthCheckFrequency;
+ } catch (InterruptedException e) {
+ LOG.warn(
+ "Waiting thread was interrupted while waiting for connection to
Elasticsearch to be available");
+ }
+ }
+ if (waitingTime >= MAX_STARTUP_WAITING_TIME_MSEC) {
+ throw new IOException("Max startup waiting for embedded Elasticsearch to
start was exceeded");
+ }
}
@AfterClass
diff --git
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index d2791c76d1f..b453b9f4740 100644
---
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -21,12 +21,11 @@
import static
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.ConnectionConfiguration;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.Read;
import static
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.ACCEPTABLE_EMPTY_SPLITS_PERCENTAGE;
-import static
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.ES_INDEX;
import static
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.ES_TYPE;
import static
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.NUM_DOCS_UTESTS;
+import static
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.getEsIndex;
import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource;
import static org.hamcrest.Matchers.lessThan;
-import static org.junit.Assert.assertEquals;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
import java.io.IOException;
@@ -100,7 +99,8 @@ public Settings indexSettings() {
@Before
public void setup() {
if (connectionConfiguration == null) {
- connectionConfiguration =
ConnectionConfiguration.create(fillAddresses(), ES_INDEX, ES_TYPE);
+ connectionConfiguration =
+ ConnectionConfiguration.create(fillAddresses(), getEsIndex(),
ES_TYPE);
elasticsearchIOTestCommon =
new ElasticsearchIOTestCommon(connectionConfiguration,
getRestClient(), false);
}
@@ -112,7 +112,7 @@ public void setup() {
public void testSizes() throws Exception {
// need to create the index using the helper method (not create it at
first insertion)
// for the indexSettings() to be run
- createIndex(ES_INDEX);
+ createIndex(getEsIndex());
elasticsearchIOTestCommon.testSizes();
}
@@ -120,7 +120,7 @@ public void testSizes() throws Exception {
public void testRead() throws Exception {
// need to create the index using the helper method (not create it at
first insertion)
// for the indexSettings() to be run
- createIndex(ES_INDEX);
+ createIndex(getEsIndex());
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testRead();
}
@@ -129,7 +129,7 @@ public void testRead() throws Exception {
public void testReadWithQuery() throws Exception {
// need to create the index using the helper method (not create it at
first insertion)
// for the indexSettings() to be run
- createIndex(ES_INDEX);
+ createIndex(getEsIndex());
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testReadWithQuery();
}
@@ -162,7 +162,7 @@ public void testWriteWithMaxBatchSizeBytes() throws
Exception {
public void testSplit() throws Exception {
//need to create the index using the helper method (not create it at first
insertion)
// for the indexSettings() to be run
- createIndex(ES_INDEX);
+ createIndex(getEsIndex());
ElasticSearchIOTestUtils.insertTestDocuments(
connectionConfiguration, NUM_DOCS_UTESTS, getRestClient());
PipelineOptions options = PipelineOptionsFactory.create();
diff --git
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java
index 4b95ea026b2..6867a95004b 100644
---
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java
+++
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java
@@ -60,8 +60,13 @@ static void deleteIndex(ConnectionConfiguration
connectionConfiguration, RestCli
deleteIndex(restClient, connectionConfiguration.getIndex());
}
+ private static void closeIndex(RestClient restClient, String index) throws
IOException {
+ restClient.performRequest("POST", String.format("/%s/_close", index));
+ }
+
private static void deleteIndex(RestClient restClient, String index) throws
IOException {
try {
+ closeIndex(restClient, index);
restClient.performRequest("DELETE", String.format("/%s", index));
} catch (IOException e) {
// it is fine to ignore this expression as deleteIndex occurs in @before,
diff --git
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOITCommon.java
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOITCommon.java
index 6598e975f35..6ef38bda99a 100644
---
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOITCommon.java
+++
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOITCommon.java
@@ -18,9 +18,9 @@
package org.apache.beam.sdk.io.elasticsearch;
import static
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.ConnectionConfiguration;
-import static
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.ES_INDEX;
import static
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.ES_TYPE;
import static
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.NUM_DOCS_ITESTS;
+import static
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.getEsIndex;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
@@ -54,9 +54,9 @@
/** Enum encapsulating the mode of operation and the index. */
enum IndexMode {
- READ(ES_INDEX),
- WRITE(ES_INDEX + System.currentTimeMillis()),
- WRITE_PARTIAL(ES_INDEX + "_partial_" + System.currentTimeMillis());
+ READ(getEsIndex()),
+ WRITE(getEsIndex() + System.currentTimeMillis()),
+ WRITE_PARTIAL(getEsIndex() + "_partial_" + System.currentTimeMillis());
private final String index;
diff --git
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
index 57b450d8c9e..5a8ad788226 100644
---
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
+++
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
@@ -82,7 +82,10 @@
"{ \"index\" : { \"_index\" : \"test\", \"_type\" : \"doc\", \"_id\" :
\"1\" } }\n"
+ "{ \"field1\" : @ }\n";
- static final String ES_INDEX = "beam";
+ static String getEsIndex() {
+ return "beam" + Thread.currentThread().getId();
+ }
+
static final String ES_TYPE = "test";
static final long NUM_DOCS_UTESTS = 400L;
static final long NUM_DOCS_ITESTS = 50000L;
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 139172)
Time Spent: 1h 20m (was: 1h 10m)
> org.apache.beam.sdk.io.elasticsearch/ElasticsearchIOTest is flaky
> -----------------------------------------------------------------
>
> Key: BEAM-5172
> URL: https://issues.apache.org/jira/browse/BEAM-5172
> Project: Beam
> Issue Type: Bug
> Components: io-java-elasticsearch, test-failures
> Reporter: Valentyn Tymofieiev
> Assignee: Etienne Chauchot
> Priority: Major
> Time Spent: 1h 20m
> Remaining Estimate: 0h
>
> In a recent PostCommit builld,
> https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/1290/testReport/junit/org.apache.beam.sdk.io.elasticsearch/ElasticsearchIOTest/testRead/
> failed with:
> Error Message
> java.lang.AssertionError: Count/Flatten.PCollections.out:
> Expected: <400L>
> but: was <470L>
> Stacktrace
> java.lang.AssertionError: Count/Flatten.PCollections.out:
> Expected: <400L>
> but: was <470L>
> at
> org.apache.beam.sdk.testing.PAssert$PAssertionSite.capture(PAssert.java:168)
> at org.apache.beam.sdk.testing.PAssert.thatSingleton(PAssert.java:413)
> at org.apache.beam.sdk.testing.PAssert.thatSingleton(PAssert.java:404)
> at
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.testRead(ElasticsearchIOTestCommon.java:124)
> at
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTest.testRead(ElasticsearchIOTest.java:125)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> at
> org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:239)
> at
> org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:319)
> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> at
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:106)
> at
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
> at
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
> at
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:66)
> at
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
> at
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
> at
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
> at
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
> at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
> at
> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:109)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
> at
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
> at
> org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:155)
> at
> org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:137)
> at
> org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:404)
> at
> org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:63)
> at
> org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:46)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at
> org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:55)
> Caused by: java.lang.AssertionError:
> Expected: <400L>
> but: was <470L>
> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
> at org.junit.Assert.assertThat(Assert.java:956)
> at org.junit.Assert.assertThat(Assert.java:923)
> at
> org.apache.beam.sdk.testing.PAssert$AssertIsEqualTo.apply(PAssert.java:1264)
> at
> org.apache.beam.sdk.testing.PAssert$AssertIsEqualTo.apply(PAssert.java:1254)
> at
> org.apache.beam.sdk.testing.PAssert$CheckRelationAgainstExpected.apply(PAssert.java:960)
> at
> org.apache.beam.sdk.testing.PAssert$CheckRelationAgainstExpected.apply(PAssert.java:940)
> at org.apache.beam.sdk.testing.PAssert.doChecks(PAssert.java:1241)
> at
> org.apache.beam.sdk.testing.PAssert$SideInputCheckerDoFn.processElement(PAssert.java:1185)
> at
> org.apache.beam.sdk.testing.PAssert$SideInputCheckerDoFn$DoFnInvoker.invokeProcessElement(Unknown
> Source)
> at
> org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
> at
> org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:237)
> at
> org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:87)
> at
> org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:207)
> at
> org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:55)
> at
> org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:160)
> at
> org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:124)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Standard Error
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)