[ 
https://issues.apache.org/jira/browse/BEAM-5172?focusedWorklogId=139172&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-139172
 ]

ASF GitHub Bot logged work on BEAM-5172:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 29/Aug/18 08:39
            Start Date: 29/Aug/18 08:39
    Worklog Time Spent: 10m 
      Work Description: asfgit closed pull request #6279:  [BEAM-5172] Fix 
Elasticsearch UTests flakiness
URL: https://github.com/apache/beam/pull/6279
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index 862ba5b4292..9920dde53ba 100644
--- 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++ 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -21,9 +21,9 @@
 import static 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.ConnectionConfiguration;
 import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.Read;
 import static 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.ACCEPTABLE_EMPTY_SPLITS_PERCENTAGE;
-import static 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.ES_INDEX;
 import static 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.ES_TYPE;
 import static 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.NUM_DOCS_UTESTS;
+import static 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.getEsIndex;
 import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource;
 import static org.hamcrest.Matchers.lessThan;
 import static org.junit.Assert.assertEquals;
@@ -61,6 +61,7 @@
   private static final Logger LOG = 
LoggerFactory.getLogger(ElasticsearchIOTest.class);
 
   private static final String ES_IP = "127.0.0.1";
+  private static final int MAX_STARTUP_WAITING_TIME_MSEC = 5000;
 
   private static Node node;
   private static RestClient restClient;
@@ -97,10 +98,25 @@ public static void beforeClass() throws IOException {
     node.start();
     connectionConfiguration =
         ConnectionConfiguration.create(
-            new String[] {"http://"; + ES_IP + ":" + esHttpPort}, ES_INDEX, 
ES_TYPE);
+            new String[] {"http://"; + ES_IP + ":" + esHttpPort}, getEsIndex(), 
ES_TYPE);
     restClient = connectionConfiguration.createClient();
     elasticsearchIOTestCommon =
         new ElasticsearchIOTestCommon(connectionConfiguration, restClient, 
false);
+    int waitingTime = 0;
+    int healthCheckFrequency = 500;
+    while ((waitingTime < MAX_STARTUP_WAITING_TIME_MSEC)
+        && restClient.performRequest("HEAD", 
"/").getStatusLine().getStatusCode() != 200) {
+      try {
+        Thread.sleep(healthCheckFrequency);
+        waitingTime += healthCheckFrequency;
+      } catch (InterruptedException e) {
+        LOG.warn(
+            "Waiting thread was interrupted while waiting for connection to 
Elasticsearch to be available");
+      }
+    }
+    if (waitingTime >= MAX_STARTUP_WAITING_TIME_MSEC) {
+      throw new IOException("Max startup waiting for embedded Elasticsearch to 
start was exceeded");
+    }
   }
 
   @AfterClass
diff --git 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index d2791c76d1f..b453b9f4740 100644
--- 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++ 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -21,12 +21,11 @@
 import static 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.ConnectionConfiguration;
 import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.Read;
 import static 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.ACCEPTABLE_EMPTY_SPLITS_PERCENTAGE;
-import static 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.ES_INDEX;
 import static 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.ES_TYPE;
 import static 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.NUM_DOCS_UTESTS;
+import static 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.getEsIndex;
 import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource;
 import static org.hamcrest.Matchers.lessThan;
-import static org.junit.Assert.assertEquals;
 
 import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
 import java.io.IOException;
@@ -100,7 +99,8 @@ public Settings indexSettings() {
   @Before
   public void setup() {
     if (connectionConfiguration == null) {
-      connectionConfiguration = 
ConnectionConfiguration.create(fillAddresses(), ES_INDEX, ES_TYPE);
+      connectionConfiguration =
+          ConnectionConfiguration.create(fillAddresses(), getEsIndex(), 
ES_TYPE);
       elasticsearchIOTestCommon =
           new ElasticsearchIOTestCommon(connectionConfiguration, 
getRestClient(), false);
     }
@@ -112,7 +112,7 @@ public void setup() {
   public void testSizes() throws Exception {
     // need to create the index using the helper method (not create it at 
first insertion)
     // for the indexSettings() to be run
-    createIndex(ES_INDEX);
+    createIndex(getEsIndex());
     elasticsearchIOTestCommon.testSizes();
   }
 
@@ -120,7 +120,7 @@ public void testSizes() throws Exception {
   public void testRead() throws Exception {
     // need to create the index using the helper method (not create it at 
first insertion)
     // for the indexSettings() to be run
-    createIndex(ES_INDEX);
+    createIndex(getEsIndex());
     elasticsearchIOTestCommon.setPipeline(pipeline);
     elasticsearchIOTestCommon.testRead();
   }
@@ -129,7 +129,7 @@ public void testRead() throws Exception {
   public void testReadWithQuery() throws Exception {
     // need to create the index using the helper method (not create it at 
first insertion)
     // for the indexSettings() to be run
-    createIndex(ES_INDEX);
+    createIndex(getEsIndex());
     elasticsearchIOTestCommon.setPipeline(pipeline);
     elasticsearchIOTestCommon.testReadWithQuery();
   }
@@ -162,7 +162,7 @@ public void testWriteWithMaxBatchSizeBytes() throws 
Exception {
   public void testSplit() throws Exception {
     //need to create the index using the helper method (not create it at first 
insertion)
     // for the indexSettings() to be run
-    createIndex(ES_INDEX);
+    createIndex(getEsIndex());
     ElasticSearchIOTestUtils.insertTestDocuments(
         connectionConfiguration, NUM_DOCS_UTESTS, getRestClient());
     PipelineOptions options = PipelineOptionsFactory.create();
diff --git 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java
 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java
index 4b95ea026b2..6867a95004b 100644
--- 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java
+++ 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java
@@ -60,8 +60,13 @@ static void deleteIndex(ConnectionConfiguration 
connectionConfiguration, RestCli
     deleteIndex(restClient, connectionConfiguration.getIndex());
   }
 
+  private static void closeIndex(RestClient restClient, String index) throws 
IOException {
+    restClient.performRequest("POST", String.format("/%s/_close", index));
+  }
+
   private static void deleteIndex(RestClient restClient, String index) throws 
IOException {
     try {
+      closeIndex(restClient, index);
       restClient.performRequest("DELETE", String.format("/%s", index));
     } catch (IOException e) {
       // it is fine to ignore this expression as deleteIndex occurs in @before,
diff --git 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOITCommon.java
 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOITCommon.java
index 6598e975f35..6ef38bda99a 100644
--- 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOITCommon.java
+++ 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOITCommon.java
@@ -18,9 +18,9 @@
 package org.apache.beam.sdk.io.elasticsearch;
 
 import static 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.ConnectionConfiguration;
-import static 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.ES_INDEX;
 import static 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.ES_TYPE;
 import static 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.NUM_DOCS_ITESTS;
+import static 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.getEsIndex;
 
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
@@ -54,9 +54,9 @@
 
   /** Enum encapsulating the mode of operation and the index. */
   enum IndexMode {
-    READ(ES_INDEX),
-    WRITE(ES_INDEX + System.currentTimeMillis()),
-    WRITE_PARTIAL(ES_INDEX + "_partial_" + System.currentTimeMillis());
+    READ(getEsIndex()),
+    WRITE(getEsIndex() + System.currentTimeMillis()),
+    WRITE_PARTIAL(getEsIndex() + "_partial_" + System.currentTimeMillis());
 
     private final String index;
 
diff --git 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
index 57b450d8c9e..5a8ad788226 100644
--- 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
+++ 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
@@ -82,7 +82,10 @@
       "{ \"index\" : { \"_index\" : \"test\", \"_type\" : \"doc\", \"_id\" : 
\"1\" } }\n"
           + "{ \"field1\" : @ }\n";
 
-  static final String ES_INDEX = "beam";
+  static String getEsIndex() {
+    return "beam" + Thread.currentThread().getId();
+  }
+
   static final String ES_TYPE = "test";
   static final long NUM_DOCS_UTESTS = 400L;
   static final long NUM_DOCS_ITESTS = 50000L;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 139172)
    Time Spent: 1h 20m  (was: 1h 10m)

> org.apache.beam.sdk.io.elasticsearch/ElasticsearchIOTest is flaky
> -----------------------------------------------------------------
>
>                 Key: BEAM-5172
>                 URL: https://issues.apache.org/jira/browse/BEAM-5172
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-elasticsearch, test-failures
>            Reporter: Valentyn Tymofieiev
>            Assignee: Etienne Chauchot
>            Priority: Major
>          Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> In a recent PostCommit builld, 
> https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/1290/testReport/junit/org.apache.beam.sdk.io.elasticsearch/ElasticsearchIOTest/testRead/
>  failed with:
> Error Message
> java.lang.AssertionError: Count/Flatten.PCollections.out: 
> Expected: <400L>
>      but: was <470L>
> Stacktrace
> java.lang.AssertionError: Count/Flatten.PCollections.out: 
> Expected: <400L>
>      but: was <470L>
>       at 
> org.apache.beam.sdk.testing.PAssert$PAssertionSite.capture(PAssert.java:168)
>       at org.apache.beam.sdk.testing.PAssert.thatSingleton(PAssert.java:413)
>       at org.apache.beam.sdk.testing.PAssert.thatSingleton(PAssert.java:404)
>       at 
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.testRead(ElasticsearchIOTestCommon.java:124)
>       at 
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTest.testRead(ElasticsearchIOTest.java:125)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>       at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>       at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>       at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>       at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>       at 
> org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:239)
>       at 
> org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:319)
>       at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>       at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>       at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>       at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>       at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>       at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>       at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>       at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>       at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>       at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>       at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>       at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>       at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>       at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>       at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:106)
>       at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>       at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>       at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:66)
>       at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>       at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>       at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
>       at 
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
>       at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
>       at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:109)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>       at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>       at 
> org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:155)
>       at 
> org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:137)
>       at 
> org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:404)
>       at 
> org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:63)
>       at 
> org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:46)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at 
> org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:55)
> Caused by: java.lang.AssertionError: 
> Expected: <400L>
>      but: was <470L>
>       at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>       at org.junit.Assert.assertThat(Assert.java:956)
>       at org.junit.Assert.assertThat(Assert.java:923)
>       at 
> org.apache.beam.sdk.testing.PAssert$AssertIsEqualTo.apply(PAssert.java:1264)
>       at 
> org.apache.beam.sdk.testing.PAssert$AssertIsEqualTo.apply(PAssert.java:1254)
>       at 
> org.apache.beam.sdk.testing.PAssert$CheckRelationAgainstExpected.apply(PAssert.java:960)
>       at 
> org.apache.beam.sdk.testing.PAssert$CheckRelationAgainstExpected.apply(PAssert.java:940)
>       at org.apache.beam.sdk.testing.PAssert.doChecks(PAssert.java:1241)
>       at 
> org.apache.beam.sdk.testing.PAssert$SideInputCheckerDoFn.processElement(PAssert.java:1185)
>       at 
> org.apache.beam.sdk.testing.PAssert$SideInputCheckerDoFn$DoFnInvoker.invokeProcessElement(Unknown
>  Source)
>       at 
> org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
>       at 
> org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:237)
>       at 
> org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:87)
>       at 
> org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:207)
>       at 
> org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:55)
>       at 
> org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:160)
>       at 
> org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:124)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> Standard Error



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to